from __future__ import annotations import uuid from datetime import date, datetime import unicodedata from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from ..models import Movement, Product, TelegramUser from .datetime_utils import NO_EXPIRY_DATE, now_in_config_timezone, today_in_config_timezone from .expiry_rules import estimate_expiry_for_category from . import sheets_backup from .sync_service import enqueue_full_sync, sheets_sync_available def now_iso() -> str: return datetime.utcnow().replace(microsecond=0).isoformat() def normalize_product_payload(payload: dict) -> dict: normalized = dict(payload) today = today_in_config_timezone() category = str(normalized.get("categoria", "") or "").strip().lower() normalized["categoria"] = category normalized["caducidadEstimada"] = bool(normalized.get("caducidadEstimada", False)) expiry = str(normalized.get("fechaCaducidad", "") or "").strip() if not expiry or expiry.lower() in {"no-caduca", "no caduca", "sin-caducidad", "sin caducidad"}: estimated = estimate_expiry_for_category(category) if category else "" if estimated: normalized["fechaCaducidad"] = estimated normalized["caducidadEstimada"] = estimated != NO_EXPIRY_DATE notes = str(normalized.get("notas", "") or "").strip() extra = ( f"Fecha de caducidad estimada por categoria {category}." if estimated != NO_EXPIRY_DATE else "Producto sin caducidad segun categoria." ) normalized["notas"] = f"{notes} {extra}".strip() if notes else extra else: normalized["fechaCaducidad"] = NO_EXPIRY_DATE notes = str(normalized.get("notas", "") or "").strip() extra = "Producto sin caducidad declarada." normalized["notas"] = f"{notes} {extra}".strip() if notes else extra else: normalized["fechaCaducidad"] = expiry ingreso = str(normalized.get("fechaIngreso", "") or "").strip() normalized["fechaIngreso"] = ingreso or today produccion = str(normalized.get("fechaProduccion", "") or "").strip() if not produccion: normalized["fechaProduccion"] = today notes = str(normalized.get("notas", "") or "").strip() extra = "Fecha de produccion asumida como hoy por dato faltante." normalized["notas"] = f"{notes} {extra}".strip() if notes else extra else: normalized["fechaProduccion"] = produccion return normalized def product_to_dict(product: Product) -> dict: return { "id": product.id, "producto": product.producto, "precio": product.precio, "cantidad": product.cantidad, "unidad": product.unidad, "fechaCaducidad": product.fecha_caducidad, "fechaIngreso": product.fecha_ingreso, "fechaProduccion": product.fecha_produccion, "categoria": product.categoria, "caducidadEstimada": bool(product.caducidad_estimada), "notas": product.notas, "fuente": product.fuente, "stockActual": product.stock_actual, "consumidoTotal": product.consumido_total, "createdAt": product.created_at, "updatedAt": product.updated_at, } def movement_to_dict(movement: Movement) -> dict: return { "id": movement.id, "productId": movement.product_id, "producto": movement.producto, "tipo": movement.tipo, "cantidad": movement.cantidad, "unidad": movement.unidad, "notas": movement.notas, "fuente": movement.fuente, "createdAt": movement.created_at, } def normalize_product_key(name: str) -> str: text = unicodedata.normalize("NFKD", name or "").encode("ascii", "ignore").decode("ascii") text = text.lower() text = "".join(char if char.isalnum() or char.isspace() else " " for char in text) return " ".join(text.split()) def aggregate_product_records(records: list[dict]) -> list[dict]: grouped: dict[tuple[str, str, str], dict] = {} for record in records: key = ( normalize_product_key(record.get("producto", "")), str(record.get("fechaCaducidad", "") or ""), str(record.get("unidad", "") or "unidad").lower(), ) current = grouped.get(key) if current is None: grouped[key] = dict(record) continue current["cantidad"] = float(current.get("cantidad", 0) or 0) + float(record.get("cantidad", 0) or 0) current["stockActual"] = float(current.get("stockActual", 0) or 0) + float(record.get("stockActual", 0) or 0) current["consumidoTotal"] = float(current.get("consumidoTotal", 0) or 0) + float(record.get("consumidoTotal", 0) or 0) current["precio"] = max(float(current.get("precio", 0) or 0), float(record.get("precio", 0) or 0)) current["caducidadEstimada"] = bool(current.get("caducidadEstimada", False) or record.get("caducidadEstimada", False)) existing_notes = str(current.get("notas", "") or "").strip() incoming_notes = str(record.get("notas", "") or "").strip() if incoming_notes and incoming_notes not in existing_notes: current["notas"] = f"{existing_notes} | {incoming_notes}".strip(" |") if len(str(record.get("producto", ""))) > len(str(current.get("producto", ""))): current["producto"] = record.get("producto", current["producto"]) if str(record.get("updatedAt", "")) > str(current.get("updatedAt", "")): current["updatedAt"] = record.get("updatedAt", current.get("updatedAt", "")) return sorted( grouped.values(), key=lambda item: ( item.get("fechaCaducidad", ""), normalize_product_key(item.get("producto", "")), ), ) async def register_telegram_user(session: AsyncSession, chat_id: str, username: str = "", first_name: str = "", last_name: str = "") -> None: user = await session.get(TelegramUser, str(chat_id)) if user is None: user = TelegramUser( chat_id=str(chat_id), username=username or "", first_name=first_name or "", last_name=last_name or "", is_active=1, updated_at=now_iso(), ) session.add(user) else: user.username = username or "" user.first_name = first_name or "" user.last_name = last_name or "" user.is_active = 1 user.updated_at = now_iso() await session.commit() async def list_active_users(session: AsyncSession) -> list[TelegramUser]: result = await session.execute(select(TelegramUser).where(TelegramUser.is_active == 1)) return list(result.scalars().all()) async def create_product(session: AsyncSession, payload: dict) -> dict: payload = normalize_product_payload(payload) timestamp = now_iso() product = Product( id=str(uuid.uuid4()), producto=payload["producto"], precio=float(payload["precio"]), cantidad=float(payload["cantidad"]), unidad=payload["unidad"], fecha_caducidad=payload["fechaCaducidad"], fecha_ingreso=payload["fechaIngreso"], fecha_produccion=payload["fechaProduccion"], categoria=payload.get("categoria", ""), caducidad_estimada=1 if payload.get("caducidadEstimada", False) else 0, notas=payload.get("notas", ""), fuente=payload.get("fuente", "web"), stock_actual=float(payload["cantidad"]), consumido_total=0, created_at=timestamp, updated_at=timestamp, ) movement = Movement( id=str(uuid.uuid4()), product_id=product.id, producto=product.producto, tipo="ingreso", cantidad=product.cantidad, unidad=product.unidad, notas=product.notas, fuente=product.fuente, created_at=timestamp, ) session.add(product) session.add(movement) await enqueue_full_sync(session) await session.commit() return product_to_dict(product) async def list_products(session: AsyncSession, query: str = "") -> list[dict]: result = await session.execute(select(Product).order_by(Product.fecha_caducidad.asc(), Product.producto.asc())) products = list(result.scalars().all()) if query: needle = query.lower().strip() products = [p for p in products if needle in p.producto.lower() or needle in p.notas.lower()] return [product_to_dict(product) for product in products] async def list_movements_local(session: AsyncSession) -> list[dict]: result = await session.execute(select(Movement).order_by(Movement.created_at.desc())) return [movement_to_dict(item) for item in result.scalars().all()] async def consume_product(session: AsyncSession, payload: dict) -> dict: result = await session.execute( select(Product) .where(Product.producto.ilike(payload["producto"]), Product.stock_actual > 0) .order_by(Product.fecha_caducidad.asc(), Product.created_at.asc()) .limit(1) ) product = result.scalar_one_or_none() if product is None: raise ValueError(f'No existe stock disponible para "{payload["producto"]}".') if product.unidad.lower() != payload["unidad"].lower(): raise ValueError(f"La unidad esperada para {product.producto} es {product.unidad}.") if product.stock_actual < payload["cantidad"]: raise ValueError(f"Stock insuficiente de {product.producto}.") product.stock_actual -= float(payload["cantidad"]) product.consumido_total += float(payload["cantidad"]) product.updated_at = now_iso() movement = Movement( id=str(uuid.uuid4()), product_id=product.id, producto=product.producto, tipo="consumo", cantidad=float(payload["cantidad"]), unidad=product.unidad, notas=payload.get("notas", ""), fuente=payload.get("fuente", "telegram-consumo"), created_at=product.updated_at, ) session.add(movement) await enqueue_full_sync(session) await session.commit() response = product_to_dict(product) | {"consumedNow": float(payload["cantidad"])} return response async def get_expiring_products(session: AsyncSession, days_ahead: int) -> list[dict]: products = await list_products(session) today = now_in_config_timezone().date() expiring = [] for item in products: if item["stockActual"] <= 0 or not item["fechaCaducidad"]: continue if item["fechaCaducidad"] == NO_EXPIRY_DATE: continue expiry = date.fromisoformat(item["fechaCaducidad"]) diff = (expiry - today).days if 0 <= diff <= days_ahead: expiring.append(item) return aggregate_product_records(expiring) async def list_products_grouped(session: AsyncSession, query: str = "") -> list[dict]: return aggregate_product_records(await list_products(session, query)) async def sync_from_sheets(session: AsyncSession) -> dict: if not sheets_sync_available(): return {"ok": False, "reason": "sheets-disabled"} products_response, movements_response = await sheets_backup.list_products(), await sheets_backup.list_movements() records = products_response.get("records", []) movements = movements_response.get("movements", []) await session.execute(delete(Product)) await session.execute(delete(Movement)) for item in records: session.add( Product( id=item["id"], producto=item["producto"], precio=float(item.get("precio", 0)), cantidad=float(item.get("cantidad", 0)), unidad=item.get("unidad", "unidad"), fecha_caducidad=item.get("fechaCaducidad", ""), fecha_ingreso=item.get("fechaIngreso", ""), fecha_produccion=item.get("fechaProduccion", ""), categoria=item.get("categoria", ""), caducidad_estimada=1 if item.get("caducidadEstimada", False) else 0, notas=item.get("notas", ""), fuente=item.get("fuente", "web"), stock_actual=float(item.get("stockActual", item.get("cantidad", 0))), consumido_total=float(item.get("consumidoTotal", 0)), created_at=item.get("createdAt", now_iso()), updated_at=item.get("updatedAt", item.get("createdAt", now_iso())), ) ) for item in movements: session.add( Movement( id=item["id"], product_id=item.get("productId", ""), producto=item["producto"], tipo=item["tipo"], cantidad=float(item.get("cantidad", 0)), unidad=item.get("unidad", "unidad"), notas=item.get("notas", ""), fuente=item.get("fuente", ""), created_at=item.get("createdAt", now_iso()), ) ) await session.commit() return {"products": len(records), "movements": len(movements)} async def sync_to_sheets(session: AsyncSession) -> dict: if not sheets_sync_available(): return {"ok": False, "reason": "sheets-disabled"} records = await list_products(session) movements = await list_movements_local(session) await sheets_backup.replace_snapshot(records, movements) return {"ok": True, "products": len(records), "movements": len(movements)}