bot_fam / python_backend /app /services /inventory.py
JairoDanielMT's picture
Add memory, pending actions, sync queue, and Lima reminders
2552437
from __future__ import annotations
import uuid
from datetime import date, datetime
import unicodedata
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from ..models import Movement, Product, TelegramUser
from .datetime_utils import NO_EXPIRY_DATE, now_in_config_timezone, today_in_config_timezone
from .expiry_rules import estimate_expiry_for_category
from . import sheets_backup
from .sync_service import enqueue_full_sync, sheets_sync_available
def now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat()
def normalize_product_payload(payload: dict) -> dict:
normalized = dict(payload)
today = today_in_config_timezone()
category = str(normalized.get("categoria", "") or "").strip().lower()
normalized["categoria"] = category
normalized["caducidadEstimada"] = bool(normalized.get("caducidadEstimada", False))
expiry = str(normalized.get("fechaCaducidad", "") or "").strip()
if not expiry or expiry.lower() in {"no-caduca", "no caduca", "sin-caducidad", "sin caducidad"}:
estimated = estimate_expiry_for_category(category) if category else ""
if estimated:
normalized["fechaCaducidad"] = estimated
normalized["caducidadEstimada"] = estimated != NO_EXPIRY_DATE
notes = str(normalized.get("notas", "") or "").strip()
extra = (
f"Fecha de caducidad estimada por categoria {category}."
if estimated != NO_EXPIRY_DATE
else "Producto sin caducidad segun categoria."
)
normalized["notas"] = f"{notes} {extra}".strip() if notes else extra
else:
normalized["fechaCaducidad"] = NO_EXPIRY_DATE
notes = str(normalized.get("notas", "") or "").strip()
extra = "Producto sin caducidad declarada."
normalized["notas"] = f"{notes} {extra}".strip() if notes else extra
else:
normalized["fechaCaducidad"] = expiry
ingreso = str(normalized.get("fechaIngreso", "") or "").strip()
normalized["fechaIngreso"] = ingreso or today
produccion = str(normalized.get("fechaProduccion", "") or "").strip()
if not produccion:
normalized["fechaProduccion"] = today
notes = str(normalized.get("notas", "") or "").strip()
extra = "Fecha de produccion asumida como hoy por dato faltante."
normalized["notas"] = f"{notes} {extra}".strip() if notes else extra
else:
normalized["fechaProduccion"] = produccion
return normalized
def product_to_dict(product: Product) -> dict:
return {
"id": product.id,
"producto": product.producto,
"precio": product.precio,
"cantidad": product.cantidad,
"unidad": product.unidad,
"fechaCaducidad": product.fecha_caducidad,
"fechaIngreso": product.fecha_ingreso,
"fechaProduccion": product.fecha_produccion,
"categoria": product.categoria,
"caducidadEstimada": bool(product.caducidad_estimada),
"notas": product.notas,
"fuente": product.fuente,
"stockActual": product.stock_actual,
"consumidoTotal": product.consumido_total,
"createdAt": product.created_at,
"updatedAt": product.updated_at,
}
def movement_to_dict(movement: Movement) -> dict:
return {
"id": movement.id,
"productId": movement.product_id,
"producto": movement.producto,
"tipo": movement.tipo,
"cantidad": movement.cantidad,
"unidad": movement.unidad,
"notas": movement.notas,
"fuente": movement.fuente,
"createdAt": movement.created_at,
}
def normalize_product_key(name: str) -> str:
text = unicodedata.normalize("NFKD", name or "").encode("ascii", "ignore").decode("ascii")
text = text.lower()
text = "".join(char if char.isalnum() or char.isspace() else " " for char in text)
return " ".join(text.split())
def aggregate_product_records(records: list[dict]) -> list[dict]:
grouped: dict[tuple[str, str, str], dict] = {}
for record in records:
key = (
normalize_product_key(record.get("producto", "")),
str(record.get("fechaCaducidad", "") or ""),
str(record.get("unidad", "") or "unidad").lower(),
)
current = grouped.get(key)
if current is None:
grouped[key] = dict(record)
continue
current["cantidad"] = float(current.get("cantidad", 0) or 0) + float(record.get("cantidad", 0) or 0)
current["stockActual"] = float(current.get("stockActual", 0) or 0) + float(record.get("stockActual", 0) or 0)
current["consumidoTotal"] = float(current.get("consumidoTotal", 0) or 0) + float(record.get("consumidoTotal", 0) or 0)
current["precio"] = max(float(current.get("precio", 0) or 0), float(record.get("precio", 0) or 0))
current["caducidadEstimada"] = bool(current.get("caducidadEstimada", False) or record.get("caducidadEstimada", False))
existing_notes = str(current.get("notas", "") or "").strip()
incoming_notes = str(record.get("notas", "") or "").strip()
if incoming_notes and incoming_notes not in existing_notes:
current["notas"] = f"{existing_notes} | {incoming_notes}".strip(" |")
if len(str(record.get("producto", ""))) > len(str(current.get("producto", ""))):
current["producto"] = record.get("producto", current["producto"])
if str(record.get("updatedAt", "")) > str(current.get("updatedAt", "")):
current["updatedAt"] = record.get("updatedAt", current.get("updatedAt", ""))
return sorted(
grouped.values(),
key=lambda item: (
item.get("fechaCaducidad", ""),
normalize_product_key(item.get("producto", "")),
),
)
async def register_telegram_user(session: AsyncSession, chat_id: str, username: str = "", first_name: str = "", last_name: str = "") -> None:
user = await session.get(TelegramUser, str(chat_id))
if user is None:
user = TelegramUser(
chat_id=str(chat_id),
username=username or "",
first_name=first_name or "",
last_name=last_name or "",
is_active=1,
updated_at=now_iso(),
)
session.add(user)
else:
user.username = username or ""
user.first_name = first_name or ""
user.last_name = last_name or ""
user.is_active = 1
user.updated_at = now_iso()
await session.commit()
async def list_active_users(session: AsyncSession) -> list[TelegramUser]:
result = await session.execute(select(TelegramUser).where(TelegramUser.is_active == 1))
return list(result.scalars().all())
async def create_product(session: AsyncSession, payload: dict) -> dict:
payload = normalize_product_payload(payload)
timestamp = now_iso()
product = Product(
id=str(uuid.uuid4()),
producto=payload["producto"],
precio=float(payload["precio"]),
cantidad=float(payload["cantidad"]),
unidad=payload["unidad"],
fecha_caducidad=payload["fechaCaducidad"],
fecha_ingreso=payload["fechaIngreso"],
fecha_produccion=payload["fechaProduccion"],
categoria=payload.get("categoria", ""),
caducidad_estimada=1 if payload.get("caducidadEstimada", False) else 0,
notas=payload.get("notas", ""),
fuente=payload.get("fuente", "web"),
stock_actual=float(payload["cantidad"]),
consumido_total=0,
created_at=timestamp,
updated_at=timestamp,
)
movement = Movement(
id=str(uuid.uuid4()),
product_id=product.id,
producto=product.producto,
tipo="ingreso",
cantidad=product.cantidad,
unidad=product.unidad,
notas=product.notas,
fuente=product.fuente,
created_at=timestamp,
)
session.add(product)
session.add(movement)
await enqueue_full_sync(session)
await session.commit()
return product_to_dict(product)
async def list_products(session: AsyncSession, query: str = "") -> list[dict]:
result = await session.execute(select(Product).order_by(Product.fecha_caducidad.asc(), Product.producto.asc()))
products = list(result.scalars().all())
if query:
needle = query.lower().strip()
products = [p for p in products if needle in p.producto.lower() or needle in p.notas.lower()]
return [product_to_dict(product) for product in products]
async def list_movements_local(session: AsyncSession) -> list[dict]:
result = await session.execute(select(Movement).order_by(Movement.created_at.desc()))
return [movement_to_dict(item) for item in result.scalars().all()]
async def consume_product(session: AsyncSession, payload: dict) -> dict:
result = await session.execute(
select(Product)
.where(Product.producto.ilike(payload["producto"]), Product.stock_actual > 0)
.order_by(Product.fecha_caducidad.asc(), Product.created_at.asc())
.limit(1)
)
product = result.scalar_one_or_none()
if product is None:
raise ValueError(f'No existe stock disponible para "{payload["producto"]}".')
if product.unidad.lower() != payload["unidad"].lower():
raise ValueError(f"La unidad esperada para {product.producto} es {product.unidad}.")
if product.stock_actual < payload["cantidad"]:
raise ValueError(f"Stock insuficiente de {product.producto}.")
product.stock_actual -= float(payload["cantidad"])
product.consumido_total += float(payload["cantidad"])
product.updated_at = now_iso()
movement = Movement(
id=str(uuid.uuid4()),
product_id=product.id,
producto=product.producto,
tipo="consumo",
cantidad=float(payload["cantidad"]),
unidad=product.unidad,
notas=payload.get("notas", ""),
fuente=payload.get("fuente", "telegram-consumo"),
created_at=product.updated_at,
)
session.add(movement)
await enqueue_full_sync(session)
await session.commit()
response = product_to_dict(product) | {"consumedNow": float(payload["cantidad"])}
return response
async def get_expiring_products(session: AsyncSession, days_ahead: int) -> list[dict]:
products = await list_products(session)
today = now_in_config_timezone().date()
expiring = []
for item in products:
if item["stockActual"] <= 0 or not item["fechaCaducidad"]:
continue
if item["fechaCaducidad"] == NO_EXPIRY_DATE:
continue
expiry = date.fromisoformat(item["fechaCaducidad"])
diff = (expiry - today).days
if 0 <= diff <= days_ahead:
expiring.append(item)
return aggregate_product_records(expiring)
async def list_products_grouped(session: AsyncSession, query: str = "") -> list[dict]:
return aggregate_product_records(await list_products(session, query))
async def sync_from_sheets(session: AsyncSession) -> dict:
if not sheets_sync_available():
return {"ok": False, "reason": "sheets-disabled"}
products_response, movements_response = await sheets_backup.list_products(), await sheets_backup.list_movements()
records = products_response.get("records", [])
movements = movements_response.get("movements", [])
await session.execute(delete(Product))
await session.execute(delete(Movement))
for item in records:
session.add(
Product(
id=item["id"],
producto=item["producto"],
precio=float(item.get("precio", 0)),
cantidad=float(item.get("cantidad", 0)),
unidad=item.get("unidad", "unidad"),
fecha_caducidad=item.get("fechaCaducidad", ""),
fecha_ingreso=item.get("fechaIngreso", ""),
fecha_produccion=item.get("fechaProduccion", ""),
categoria=item.get("categoria", ""),
caducidad_estimada=1 if item.get("caducidadEstimada", False) else 0,
notas=item.get("notas", ""),
fuente=item.get("fuente", "web"),
stock_actual=float(item.get("stockActual", item.get("cantidad", 0))),
consumido_total=float(item.get("consumidoTotal", 0)),
created_at=item.get("createdAt", now_iso()),
updated_at=item.get("updatedAt", item.get("createdAt", now_iso())),
)
)
for item in movements:
session.add(
Movement(
id=item["id"],
product_id=item.get("productId", ""),
producto=item["producto"],
tipo=item["tipo"],
cantidad=float(item.get("cantidad", 0)),
unidad=item.get("unidad", "unidad"),
notas=item.get("notas", ""),
fuente=item.get("fuente", ""),
created_at=item.get("createdAt", now_iso()),
)
)
await session.commit()
return {"products": len(records), "movements": len(movements)}
async def sync_to_sheets(session: AsyncSession) -> dict:
if not sheets_sync_available():
return {"ok": False, "reason": "sheets-disabled"}
records = await list_products(session)
movements = await list_movements_local(session)
await sheets_backup.replace_snapshot(records, movements)
return {"ok": True, "products": len(records), "movements": len(movements)}