Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| from contextlib import asynccontextmanager, suppress | |
| from pathlib import Path | |
| from difflib import SequenceMatcher | |
| import unicodedata | |
| from fastapi import Depends, FastAPI, Header, HTTPException, Request | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from sqlalchemy import select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from .config import settings | |
| from .database import SessionLocal, engine, get_session, run_migrations | |
| from .models import Base, Product | |
| from .schemas import ConsumptionCreate, DiagramRequest, ProductCreate, SearchRequest, TextExtractionRequest | |
| from .services.ai import ( | |
| answer_question, | |
| build_inventory_chart, | |
| build_mermaid, | |
| extract_consumption, | |
| extract_product, | |
| extract_products_batch, | |
| extract_products_from_receipt_image, | |
| route_telegram_text, | |
| ) | |
| from .services.datetime_utils import NO_EXPIRY_DATE | |
| from .services.inventory import ( | |
| consume_product, | |
| create_product, | |
| get_expiring_products, | |
| list_products, | |
| list_products_grouped, | |
| register_telegram_user, | |
| sync_from_sheets, | |
| ) | |
| from .services.agent_memory import ( | |
| build_memory_context, | |
| capture_semantic_memory, | |
| clear_pending_action, | |
| get_pending_action, | |
| recall_recent_chat, | |
| remember_episode, | |
| remember_message, | |
| set_pending_action, | |
| ) | |
| from .services.reminders import reminder_loop | |
| from .services.telegram import download_file, get_file_url, guess_photo_media_type, send_message, set_webhook | |
| from .services.sync_service import process_sync_queue_once, sync_loop | |
| from .services.whisper_local import transcribe_audio_bytes | |
| async def lifespan(_app: FastAPI): | |
| async with engine.begin() as conn: | |
| await conn.run_sync(Base.metadata.create_all) | |
| await run_migrations() | |
| try: | |
| async with SessionLocal() as session: | |
| has_local_data = (await session.execute(select(Product.id).limit(1))).first() is not None | |
| if settings.bootstrap_from_sheets and not has_local_data: | |
| await sync_from_sheets(session) | |
| except Exception as exc: | |
| print(f"Sync inicial desde Sheets omitido: {exc}") | |
| reminder_task = asyncio.create_task(reminder_loop(SessionLocal)) | |
| sync_task = asyncio.create_task(sync_loop(SessionLocal)) | |
| try: | |
| yield | |
| finally: | |
| reminder_task.cancel() | |
| sync_task.cancel() | |
| with suppress(asyncio.CancelledError): | |
| await reminder_task | |
| with suppress(asyncio.CancelledError): | |
| await sync_task | |
| app = FastAPI(lifespan=lifespan) | |
| STATIC_DIR = Path(__file__).parent / "static" | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| PHOTO_GROUPS: dict[str, dict] = {} | |
| def looks_like_batch_registration(text: str) -> bool: | |
| lowered = text.lower() | |
| signals = [ | |
| " de ahi ", | |
| " de ahí ", | |
| " tambien ", | |
| " también ", | |
| " luego ", | |
| " ademas ", | |
| " además ", | |
| " tenemos ", | |
| " otra ", | |
| " otro ", | |
| " y de ", | |
| ] | |
| matches = sum(1 for signal in signals if signal in lowered) | |
| return len(text) > 220 or matches >= 2 | |
| async def register_batch_and_build_message( | |
| session: AsyncSession, | |
| products: list[dict], | |
| source_label: str, | |
| ) -> str: | |
| if not products: | |
| raise ValueError( | |
| f"Pude procesar el {source_label}, pero no encontre productos con datos suficientes para registrar." | |
| ) | |
| created_records = [] | |
| for product in products: | |
| try: | |
| created_records.append(await create_product(session, product)) | |
| except Exception: | |
| continue | |
| if not created_records: | |
| raise ValueError( | |
| f"Procesé el {source_label}, pero no pude registrar productos validos. Intenta mencionar producto, cantidad, precio y fechas." | |
| ) | |
| lines = [ | |
| f"- {record['producto']}: {record['cantidad']} {record['unidad']}, vence {'sin caducidad' if record['fechaCaducidad'] == NO_EXPIRY_DATE else record['fechaCaducidad']}" | |
| for record in created_records[:12] | |
| ] | |
| omitted_count = max(0, len(created_records) - 12) | |
| if omitted_count: | |
| lines.append(f"- ... y {omitted_count} productos mas") | |
| return ( | |
| f"Registro por {source_label} completado.\n" | |
| f"Se registraron {len(created_records)} productos.\n\nResumen:\n" + "\n".join(lines) | |
| ) | |
| def normalize_product_name_for_match(name: str) -> str: | |
| text = unicodedata.normalize("NFKD", name or "").encode("ascii", "ignore").decode("ascii") | |
| text = text.lower() | |
| text = "".join(char if char.isalnum() or char.isspace() else " " for char in text) | |
| return " ".join(text.split()) | |
| def product_name_tokens(name: str) -> set[str]: | |
| return { | |
| token | |
| for token in normalize_product_name_for_match(name).split() | |
| if len(token) >= 3 and token not in {"pack", "paq", "und", "uni"} | |
| } | |
| def should_merge_products(left: dict, right: dict) -> bool: | |
| left_name = normalize_product_name_for_match(left.get("producto", "")) | |
| right_name = normalize_product_name_for_match(right.get("producto", "")) | |
| if not left_name or not right_name: | |
| return False | |
| similarity = SequenceMatcher(None, left_name, right_name).ratio() | |
| left_tokens = product_name_tokens(left_name) | |
| right_tokens = product_name_tokens(right_name) | |
| token_overlap = ( | |
| len(left_tokens & right_tokens) / max(1, min(len(left_tokens), len(right_tokens))) | |
| if left_tokens and right_tokens | |
| else 0 | |
| ) | |
| left_price = float(left.get("precio", 0) or 0) | |
| right_price = float(right.get("precio", 0) or 0) | |
| price_close = abs(left_price - right_price) <= 0.75 | |
| same_category = (left.get("categoria", "") or "").strip().lower() == (right.get("categoria", "") or "").strip().lower() | |
| if similarity >= 0.84 and price_close: | |
| return True | |
| if price_close and same_category and token_overlap >= 0.6: | |
| return True | |
| if price_close and same_category and (left_name.startswith(right_name) or right_name.startswith(left_name)): | |
| return True | |
| return False | |
| def product_richness_score(item: dict) -> int: | |
| fields = [ | |
| item.get("producto"), | |
| item.get("precio"), | |
| item.get("cantidad"), | |
| item.get("unidad"), | |
| item.get("fechaCaducidad"), | |
| item.get("categoria"), | |
| item.get("notas"), | |
| ] | |
| score = 0 | |
| for field in fields: | |
| if field not in ("", None, 0, False): | |
| score += 1 | |
| score += len(str(item.get("producto", ""))) | |
| return score | |
| def dedupe_products(products: list[dict]) -> list[dict]: | |
| deduped: list[dict] = [] | |
| for product in products: | |
| merged = False | |
| for index, existing in enumerate(deduped): | |
| if should_merge_products(existing, product): | |
| if product_richness_score(product) > product_richness_score(existing): | |
| deduped[index] = product | |
| merged = True | |
| break | |
| if not merged: | |
| deduped.append(product) | |
| return deduped | |
| async def process_receipt_images_and_build_message( | |
| session: AsyncSession, | |
| images: list[dict], | |
| source_label: str, | |
| ) -> str: | |
| all_products: list[dict] = [] | |
| for image in images: | |
| extracted = await extract_products_from_receipt_image( | |
| image["bytes"], | |
| source_label, | |
| media_type=image["media_type"], | |
| ) | |
| all_products.extend(extracted) | |
| products = dedupe_products(all_products) | |
| if not products: | |
| return ( | |
| "No pude extraer productos confiables desde la imagen. " | |
| "Prueba con una foto mas nitida, mejor luz o una sola foto completa del ticket." | |
| ) | |
| return await register_batch_and_build_message(session, products, source_label) | |
| async def process_photo_group_after_delay(media_group_id: str) -> None: | |
| await asyncio.sleep(2.5) | |
| group = PHOTO_GROUPS.pop(media_group_id, None) | |
| if not group: | |
| return | |
| async with SessionLocal() as session: | |
| try: | |
| message = await process_receipt_images_and_build_message( | |
| session, | |
| group["images"], | |
| "imagen", | |
| ) | |
| await send_chat_response(session, group["chat_id"], message) | |
| await remember_episode(session, str(group["chat_id"]), "Se proceso un grupo de imagenes del ticket.") | |
| await session.commit() | |
| except Exception as exc: | |
| await send_message(group["chat_id"], f"Hubo un error procesando el grupo de imagenes: {exc}") | |
| async def send_chat_response(session: AsyncSession, chat_id: int | str, text: str) -> None: | |
| await send_message(chat_id, text) | |
| await remember_message(session, str(chat_id), "assistant", text) | |
| await session.commit() | |
| def missing_product_fields(payload: dict) -> list[str]: | |
| missing: list[str] = [] | |
| if not str(payload.get("producto", "") or "").strip(): | |
| missing.append("producto") | |
| if float(payload.get("cantidad", 0) or 0) <= 0: | |
| missing.append("cantidad") | |
| if not str(payload.get("unidad", "") or "").strip(): | |
| missing.append("unidad") | |
| return missing | |
| def missing_consumption_fields(payload: dict) -> list[str]: | |
| missing: list[str] = [] | |
| if not str(payload.get("producto", "") or "").strip(): | |
| missing.append("producto") | |
| if float(payload.get("cantidad", 0) or 0) <= 0: | |
| missing.append("cantidad") | |
| if not str(payload.get("unidad", "") or "").strip(): | |
| missing.append("unidad") | |
| return missing | |
| def build_clarification_question(action: str, missing: list[str]) -> str: | |
| labels = { | |
| "producto": "que producto es exactamente", | |
| "cantidad": "que cantidad debo registrar", | |
| "unidad": "en que unidad va", | |
| } | |
| readable = ", ".join(labels.get(item, item) for item in missing) | |
| if action == "consumir": | |
| return f"Necesito una aclaracion antes de registrar el consumo: {readable}." | |
| return f"Necesito una aclaracion antes de guardar el registro: {readable}." | |
| async def send_stock_summary(chat_id: int, session: AsyncSession, query: str) -> None: | |
| records = await list_products_grouped(session, query) | |
| if not records: | |
| await send_chat_response(session, chat_id, "No encontre registros.") | |
| return | |
| lines = [ | |
| f"- {r['producto']}: stock {r['stockActual']}/{r['cantidad']} {r['unidad']}, vence {'sin caducidad' if r['fechaCaducidad'] == NO_EXPIRY_DATE else r['fechaCaducidad']}" | |
| for r in records[:12] | |
| ] | |
| await send_chat_response(session, chat_id, "Stock encontrado:\n" + "\n".join(lines)) | |
| async def send_expiring_summary(chat_id: int, session: AsyncSession) -> None: | |
| records = await get_expiring_products(session, settings.expiry_warning_days) | |
| if not records: | |
| await send_chat_response(session, chat_id, "No hay productos proximos a vencer.") | |
| return | |
| lines = [f"- {r['producto']}: vence {r['fechaCaducidad']}, stock {r['stockActual']} {r['unidad']}" for r in records] | |
| lines = [line.replace(NO_EXPIRY_DATE, "sin caducidad") for line in lines] | |
| await send_chat_response(session, chat_id, "Productos por vencer:\n" + "\n".join(lines)) | |
| async def handle_register_text(session: AsyncSession, chat_id: int, raw_text: str, source: str = "telegram-texto") -> dict: | |
| if looks_like_batch_registration(raw_text): | |
| products = await extract_products_batch(raw_text, source) | |
| message = await register_batch_and_build_message(session, products, "texto") | |
| await remember_episode(session, str(chat_id), f"Registro por lote procesado desde {source}.") | |
| await clear_pending_action(session, str(chat_id)) | |
| await send_chat_response(session, chat_id, message) | |
| return {"ok": True} | |
| payload = await extract_product(raw_text, source) | |
| missing = missing_product_fields(payload) | |
| if missing: | |
| question = build_clarification_question("registrar", missing) | |
| await set_pending_action(session, str(chat_id), "registrar", raw_text, question) | |
| await send_chat_response(session, chat_id, question) | |
| return {"ok": True, "pending": True} | |
| record = await create_product(session, payload) | |
| await clear_pending_action(session, str(chat_id)) | |
| await remember_episode(session, str(chat_id), f"Se registro producto {record['producto']} por {record['cantidad']} {record['unidad']}.") | |
| await send_chat_response( | |
| session, | |
| chat_id, | |
| f"Registro guardado.\nProducto: {record['producto']}\nCantidad: {record['cantidad']} {record['unidad']}\nIngreso: {record['fechaIngreso']}\nCaducidad: {'sin caducidad' if record['fechaCaducidad'] == NO_EXPIRY_DATE else record['fechaCaducidad']}", | |
| ) | |
| return {"ok": True} | |
| async def handle_consumption_text(session: AsyncSession, chat_id: int, raw_text: str, source: str = "telegram-consumo") -> dict: | |
| payload = await extract_consumption(raw_text, source) | |
| missing = missing_consumption_fields(payload) | |
| if missing: | |
| question = build_clarification_question("consumir", missing) | |
| await set_pending_action(session, str(chat_id), "consumir", raw_text, question) | |
| await send_chat_response(session, chat_id, question) | |
| return {"ok": True, "pending": True} | |
| result = await consume_product(session, payload) | |
| await clear_pending_action(session, str(chat_id)) | |
| await remember_episode(session, str(chat_id), f"Se registro consumo de {result['consumedNow']} {result['unidad']} de {result['producto']}.") | |
| await send_chat_response( | |
| session, | |
| chat_id, | |
| f"Consumo registrado.\nProducto: {result['producto']}\nConsumido: {result['consumedNow']} {result['unidad']}\nStock restante: {result['stockActual']} {result['unidad']}", | |
| ) | |
| return {"ok": True} | |
| async def resolve_pending_action(session: AsyncSession, chat_id: int, user_text: str, pending_action_type: str, raw_text: str) -> dict: | |
| combined_text = f"{raw_text}\nAclaracion adicional del usuario: {user_text}" | |
| if pending_action_type == "consumir": | |
| return await handle_consumption_text(session, chat_id, combined_text) | |
| return await handle_register_text(session, chat_id, combined_text) | |
| async def health() -> dict: | |
| return {"ok": True} | |
| async def index() -> FileResponse: | |
| return FileResponse(STATIC_DIR / "index.html") | |
| async def api_list_products(query: str = "", session: AsyncSession = Depends(get_session)) -> dict: | |
| return {"ok": True, "records": await list_products(session, query)} | |
| async def api_create_product(payload: ProductCreate, session: AsyncSession = Depends(get_session)) -> dict: | |
| return {"ok": True, "record": await create_product(session, payload.model_dump())} | |
| async def api_consume(payload: ConsumptionCreate, session: AsyncSession = Depends(get_session)) -> dict: | |
| try: | |
| return {"ok": True, "record": await consume_product(session, payload.model_dump())} | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| async def api_extract_product(payload: TextExtractionRequest) -> dict: | |
| return {"ok": True, "product": await extract_product(payload.text, "web")} | |
| async def api_search(payload: SearchRequest, session: AsyncSession = Depends(get_session)) -> dict: | |
| records = await list_products_grouped(session, "") | |
| return {"ok": True, "answer": await answer_question(payload.question, records)} | |
| async def api_diagram(payload: DiagramRequest, session: AsyncSession = Depends(get_session)) -> dict: | |
| records = await list_products_grouped(session, "") | |
| instruction = payload.instruction.strip() | |
| if any(token in instruction.lower() for token in ("grafico", "gráfico", "chart", "barras")): | |
| return {"ok": True, "mermaid": build_inventory_chart(instruction, records)} | |
| return {"ok": True, "mermaid": await build_mermaid(instruction, records)} | |
| async def api_expiring(days: int | None = None, session: AsyncSession = Depends(get_session)) -> dict: | |
| return {"ok": True, "records": await get_expiring_products(session, days or settings.expiry_warning_days)} | |
| async def api_sync_from_sheets(session: AsyncSession = Depends(get_session)) -> dict: | |
| return {"ok": True, "result": await sync_from_sheets(session)} | |
| async def api_sync_to_sheets() -> dict: | |
| return {"ok": True, "result": await process_sync_queue_once(SessionLocal, force=True)} | |
| async def api_set_webhook() -> dict: | |
| return await set_webhook() | |
| async def telegram_webhook( | |
| request: Request, | |
| session: AsyncSession = Depends(get_session), | |
| x_telegram_bot_api_secret_token: str | None = Header(default=None), | |
| ) -> dict: | |
| if settings.telegram_webhook_secret and x_telegram_bot_api_secret_token != settings.telegram_webhook_secret: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| update = await request.json() | |
| message = update.get("message") or update.get("edited_message") | |
| if not message or not message.get("chat", {}).get("id"): | |
| return {"ok": True} | |
| chat_id = message["chat"]["id"] | |
| await register_telegram_user( | |
| session, | |
| str(chat_id), | |
| message.get("from", {}).get("username", ""), | |
| message.get("from", {}).get("first_name", ""), | |
| message.get("from", {}).get("last_name", ""), | |
| ) | |
| try: | |
| if message.get("voice") or message.get("audio"): | |
| audio = message.get("voice") or message.get("audio") | |
| file_url = await get_file_url(audio["file_id"]) | |
| audio_bytes = await download_file(file_url) | |
| transcript = await transcribe_audio_bytes(audio_bytes, ".ogg") | |
| if not transcript.strip(): | |
| raise ValueError("No pude entender el audio. Intenta hablar mas claro o enviar texto.") | |
| await remember_message(session, str(chat_id), "user", f"[audio] {transcript}") | |
| products = await extract_products_batch(transcript, "telegram-audio") | |
| await remember_episode(session, str(chat_id), "Se recibio un audio para registro de inventario.") | |
| await send_chat_response(session, chat_id, await register_batch_and_build_message(session, products, "audio")) | |
| return {"ok": True} | |
| if message.get("photo"): | |
| photo = message["photo"][-1] | |
| file_url = await get_file_url(photo["file_id"]) | |
| image_bytes = await download_file(file_url) | |
| media_type = guess_photo_media_type(file_url) | |
| await remember_message(session, str(chat_id), "user", "[imagen] ticket o producto enviado") | |
| media_group_id = message.get("media_group_id") | |
| if media_group_id: | |
| group = PHOTO_GROUPS.setdefault( | |
| media_group_id, | |
| {"chat_id": chat_id, "images": [], "task_started": False}, | |
| ) | |
| group["images"].append({"bytes": image_bytes, "media_type": media_type}) | |
| if not group["task_started"]: | |
| group["task_started"] = True | |
| asyncio.create_task(process_photo_group_after_delay(media_group_id)) | |
| return {"ok": True} | |
| await send_chat_response( | |
| session, | |
| chat_id, | |
| await process_receipt_images_and_build_message( | |
| session, | |
| [{"bytes": image_bytes, "media_type": media_type}], | |
| "imagen", | |
| ), | |
| ) | |
| await remember_episode(session, str(chat_id), "Se proceso una imagen para extraer productos.") | |
| return {"ok": True} | |
| text = str(message.get("text", "")).strip() | |
| if not text: | |
| await send_chat_response(session, chat_id, "Envia texto, audio o imagen para registrar o consultar productos.") | |
| return {"ok": True} | |
| await remember_message(session, str(chat_id), "user", text) | |
| await capture_semantic_memory(session, str(chat_id), text) | |
| if text.startswith("/start"): | |
| await send_chat_response( | |
| session, | |
| chat_id, | |
| "Puedes hablarme en lenguaje natural. Ejemplos:\n- compre 5 kilos de arroz por 21.90\n- consumimos medio kilo de arroz\n- cuanto arroz queda\n- que productos vencen esta semana\n- armame un diagrama del inventario", | |
| ) | |
| return {"ok": True} | |
| pending = await get_pending_action(session, str(chat_id)) | |
| if pending and not text.startswith("/"): | |
| return await resolve_pending_action(session, chat_id, text, pending.action_type, pending.raw_text) | |
| if text.startswith("/buscar "): | |
| records = await list_products_grouped(session, "") | |
| answer = await answer_question(text.replace("/buscar", "", 1).strip(), records) | |
| await send_chat_response(session, chat_id, answer) | |
| return {"ok": True} | |
| if text.startswith("/diagrama "): | |
| records = await list_products_grouped(session, "") | |
| instruction = text.replace("/diagrama", "", 1).strip() | |
| mermaid = build_inventory_chart(instruction, records) if any(token in instruction.lower() for token in ("grafico", "gráfico", "chart", "barras")) else await build_mermaid(instruction, records) | |
| await send_chat_response(session, chat_id, mermaid) | |
| return {"ok": True} | |
| if text.startswith("/vencimientos"): | |
| await send_expiring_summary(chat_id, session) | |
| return {"ok": True} | |
| if text.startswith("/stock "): | |
| await send_stock_summary(chat_id, session, text.replace("/stock", "", 1).strip()) | |
| return {"ok": True} | |
| if text.startswith("/consumir "): | |
| await handle_consumption_text(session, chat_id, text.replace("/consumir", "", 1).strip()) | |
| return {"ok": True} | |
| if text.startswith("/registrar "): | |
| await handle_register_text(session, chat_id, text.replace("/registrar", "", 1).strip()) | |
| return {"ok": True} | |
| memory_context = await build_memory_context(session, str(chat_id)) | |
| route = await route_telegram_text(text, memory_context) | |
| action = str(route.get("accion", "ayuda") or "ayuda").strip().lower() | |
| route_text = str(route.get("texto", "") or text).strip() or text | |
| if action == "vencimientos": | |
| await send_expiring_summary(chat_id, session) | |
| return {"ok": True} | |
| if action == "stock": | |
| query = str(route.get("producto", "") or route_text).strip() | |
| await send_stock_summary(chat_id, session, query) | |
| return {"ok": True} | |
| if action == "buscar": | |
| question = str(route.get("pregunta", "") or route_text).strip() | |
| records = await list_products_grouped(session, "") | |
| answer = await answer_question(question, records) | |
| await send_chat_response(session, chat_id, answer) | |
| return {"ok": True} | |
| if action == "diagrama": | |
| instruction = str(route.get("instruccion", "") or route_text).strip() | |
| records = await list_products_grouped(session, "") | |
| mermaid = build_inventory_chart(instruction, records) if any(token in instruction.lower() for token in ("grafico", "gráfico", "chart", "barras")) else await build_mermaid(instruction, records) | |
| await send_chat_response(session, chat_id, mermaid) | |
| await remember_episode(session, str(chat_id), "Se genero un grafico o diagrama del inventario.") | |
| return {"ok": True} | |
| if action == "consumir": | |
| await handle_consumption_text(session, chat_id, route_text) | |
| return {"ok": True} | |
| if action == "registrar": | |
| await handle_register_text(session, chat_id, route_text) | |
| return {"ok": True} | |
| if action == "memoria": | |
| await send_chat_response(session, chat_id, await recall_recent_chat(session, str(chat_id))) | |
| return {"ok": True} | |
| await send_chat_response( | |
| session, | |
| chat_id, | |
| "Dime lo que necesitas en lenguaje natural. Ejemplos: 'compre 2 kilos de arroz', 'consumimos medio kilo de arroz', 'cuanto arroz queda', 'que vence esta semana'.", | |
| ) | |
| except ValueError as exc: | |
| await session.rollback() | |
| await send_chat_response(session, chat_id, str(exc)) | |
| except Exception as exc: | |
| await session.rollback() | |
| await send_chat_response(session, chat_id, f"Hubo un error: {exc}") | |
| return {"ok": True} | |