Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import re | |
| import base64 | |
| from typing import Any | |
| from .datetime_utils import NO_EXPIRY_DATE, today_in_config_timezone | |
| import httpx | |
| from ..config import settings | |
| from .expiry_rules import list_supported_categories | |
| def extract_json(text: str) -> dict[str, Any]: | |
| fenced = re.search(r"```json\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) | |
| if fenced: | |
| parsed = json.loads(fenced.group(1)) | |
| if isinstance(parsed, list): | |
| if parsed and isinstance(parsed[0], dict): | |
| return parsed[0] | |
| raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") | |
| return parsed | |
| inline = re.search(r"\{.*\}", text, re.DOTALL) | |
| if inline: | |
| parsed = json.loads(inline.group(0)) | |
| if isinstance(parsed, list): | |
| if parsed and isinstance(parsed[0], dict): | |
| return parsed[0] | |
| raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") | |
| return parsed | |
| list_inline = re.search(r"\[.*\]", text, re.DOTALL) | |
| if list_inline: | |
| parsed = json.loads(list_inline.group(0)) | |
| if parsed and isinstance(parsed[0], dict): | |
| return parsed[0] | |
| raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") | |
| raise ValueError("No se encontro JSON valido en la respuesta de Claude.") | |
| def extract_json_list(text: str) -> list[dict[str, Any]]: | |
| fenced = re.search(r"```json\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) | |
| if fenced: | |
| parsed = json.loads(fenced.group(1)) | |
| else: | |
| list_inline = re.search(r"\[.*\]", text, re.DOTALL) | |
| if list_inline: | |
| parsed = json.loads(list_inline.group(0)) | |
| else: | |
| parsed = extract_json(text) | |
| if isinstance(parsed, dict): | |
| return [parsed] | |
| if isinstance(parsed, list): | |
| items = [item for item in parsed if isinstance(item, dict)] | |
| if items: | |
| return items | |
| raise ValueError("No se encontro una lista JSON valida en la respuesta de Claude.") | |
| async def call_claude(system: str, user_content: str, max_tokens: int = 1200) -> str: | |
| candidate_models = [] | |
| for model in [ | |
| settings.anthropic_model, | |
| "claude-haiku-4-5", | |
| "claude-3-haiku-20240307", | |
| ]: | |
| if model and model not in candidate_models: | |
| candidate_models.append(model) | |
| async with httpx.AsyncClient(timeout=90) as client: | |
| last_error = None | |
| for model in candidate_models: | |
| response = await client.post( | |
| "https://api.anthropic.com/v1/messages", | |
| headers={ | |
| "content-type": "application/json", | |
| "x-api-key": settings.anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| }, | |
| json={ | |
| "model": model, | |
| "max_tokens": max_tokens, | |
| "system": system, | |
| "messages": [{"role": "user", "content": user_content}], | |
| }, | |
| ) | |
| if response.is_success: | |
| payload = response.json() | |
| return "\n".join( | |
| part["text"] for part in payload.get("content", []) if part.get("type") == "text" | |
| ) | |
| body = response.text | |
| last_error = f"Anthropic devolvio {response.status_code} con modelo {model}: {body}" | |
| if response.status_code not in (400, 404): | |
| raise ValueError(last_error) | |
| raise ValueError(last_error or "No se pudo consultar Anthropic.") | |
| async def call_claude_with_image( | |
| system: str, | |
| prompt: str, | |
| image_bytes: bytes, | |
| media_type: str = "image/jpeg", | |
| max_tokens: int = 2400, | |
| ) -> str: | |
| candidate_models = [] | |
| for model in [settings.anthropic_model, "claude-haiku-4-5", "claude-haiku-4-5-20251001"]: | |
| if model and model not in candidate_models: | |
| candidate_models.append(model) | |
| image_b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| async with httpx.AsyncClient(timeout=120) as client: | |
| last_error = None | |
| for model in candidate_models: | |
| response = await client.post( | |
| "https://api.anthropic.com/v1/messages", | |
| headers={ | |
| "content-type": "application/json", | |
| "x-api-key": settings.anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| }, | |
| json={ | |
| "model": model, | |
| "max_tokens": max_tokens, | |
| "system": system, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": image_b64, | |
| }, | |
| }, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| } | |
| ], | |
| }, | |
| ) | |
| if response.is_success: | |
| payload = response.json() | |
| return "\n".join( | |
| part["text"] for part in payload.get("content", []) if part.get("type") == "text" | |
| ) | |
| last_error = f"Anthropic vision devolvio {response.status_code}: {response.text}" | |
| if response.status_code not in (400, 404): | |
| raise ValueError(last_error) | |
| raise ValueError(last_error or "No se pudo consultar Anthropic con imagen.") | |
| async def extract_product(raw_text: str, source: str) -> dict[str, Any]: | |
| today = today_in_config_timezone() | |
| system = f""" | |
| Eres un extractor de inventario domestico para mensajes de voz y texto libre. | |
| Tu trabajo es leer transcripciones largas, desordenadas o conversacionales y extraer UN solo registro de producto. | |
| Debes priorizar el producto principal que claramente se esta intentando registrar. | |
| Si el texto tiene relleno, comentarios o contexto extra, conservalo resumido en "notas". | |
| Devuelve solo JSON exacto. | |
| Fecha actual de referencia: {today} | |
| Esquema: | |
| {{ | |
| "producto": "string", | |
| "precio": 0, | |
| "cantidad": 0, | |
| "unidad": "unidad", | |
| "fechaCaducidad": "YYYY-MM-DD", | |
| "fechaIngreso": "YYYY-MM-DD", | |
| "fechaProduccion": "YYYY-MM-DD", | |
| "notas": "string", | |
| "fuente": "{source}" | |
| }} | |
| Reglas: | |
| - Convierte fechas a YYYY-MM-DD. | |
| - Entiende fechas en espanol, por ejemplo "7 de agosto de 2026". | |
| - Si el usuario dice "hoy", usa la fecha actual de referencia. | |
| - "fechaIngreso" es la fecha en la que el producto entro a la casa o inventario. | |
| - "fechaProduccion" es la fecha de fabricacion, emision o produccion. | |
| - Si no se menciona fechaProduccion, usa la fecha actual de referencia. | |
| - Si el producto no caduca o el usuario dice "no vence", "no caduca" o equivalente, usa "{NO_EXPIRY_DATE}" en fechaCaducidad. | |
| - Entiende cantidades habladas como "cinco", "medio", "una", "dos", "media docena" cuando sea razonable. | |
| - Entiende monedas y precios hablados como "veintiuno con noventa", "21.9 soles", "cuatro con veinte". | |
| - Normaliza unidades comunes: kg, kilo, kilos, g, gramo, gramos, litro, litros, ml, unidad, unidades, paquete, paquetes, lata, latas, botella, botellas. | |
| - Si hay varias frases largas, elige el producto que tenga mas datos completos. | |
| - Si falta fechaIngreso, usa la fecha actual de referencia. | |
| - Si un campo no aparece, usa 0 o cadena vacia. | |
| - No inventes datos no mencionados explicitamente o inferibles de forma directa. | |
| """ | |
| text = await call_claude( | |
| system, | |
| f"Texto a estructurar:\n{raw_text}\n\nDevuelve solo el JSON final del producto principal.", | |
| ) | |
| return extract_json(text) | |
| async def extract_products_batch(raw_text: str, source: str) -> list[dict[str, Any]]: | |
| today = today_in_config_timezone() | |
| system = f""" | |
| Eres un extractor de inventario domestico para audios largos de compras. | |
| Debes leer una transcripcion larga y devolver una LISTA JSON de productos. | |
| No devuelvas explicaciones. Solo JSON. | |
| Fecha actual de referencia: {today} | |
| Cada elemento debe seguir este esquema: | |
| {{ | |
| "producto": "string", | |
| "precio": 0, | |
| "cantidad": 0, | |
| "unidad": "unidad", | |
| "fechaCaducidad": "YYYY-MM-DD", | |
| "fechaIngreso": "YYYY-MM-DD", | |
| "fechaProduccion": "YYYY-MM-DD", | |
| "notas": "string", | |
| "fuente": "{source}" | |
| }} | |
| Reglas: | |
| - Devuelve una lista con todos los productos identificables de la transcripcion. | |
| - Si un producto no tiene suficientes datos minimos para registro, omitelo. | |
| - Datos minimos: producto, cantidad, unidad, fechaIngreso, fechaProduccion y fechaCaducidad. | |
| - Convierte fechas en espanol a YYYY-MM-DD. | |
| - Si el usuario dice "hoy", usa la fecha actual de referencia. | |
| - Si no se menciona fechaIngreso, usa la fecha actual de referencia. | |
| - Si no se menciona fechaProduccion, usa la fecha actual de referencia. | |
| - Entiende cantidades habladas como "cinco", "medio", "una", "dos", "media docena" cuando sea razonable. | |
| - Entiende precios hablados como "veintiuno con noventa", "30 con 16", "7 soles con 62". | |
| - Si se menciona "no vence", "no caduca" o equivalente, usa "{NO_EXPIRY_DATE}" en fechaCaducidad. | |
| - Si hay incertidumbre tipo "ponle", "aproximadamente", "más o menos", extraela igualmente y guarda esa aclaracion breve en notas. | |
| - Normaliza unidades comunes: kg, kilo, kilos, g, gramo, gramos, litro, litros, ml, unidad, unidades, paquete, paquetes, lata, latas, botella, botellas. | |
| - No inventes productos ni fechas no mencionadas. | |
| """ | |
| text = await call_claude( | |
| system, | |
| f"Transcripcion larga:\n{raw_text}\n\nDevuelve solo una lista JSON con todos los productos registrables.", | |
| max_tokens=2400, | |
| ) | |
| return extract_json_list(text) | |
| async def extract_products_from_receipt_image( | |
| image_bytes: bytes, | |
| source: str, | |
| media_type: str = "image/jpeg", | |
| ) -> list[dict[str, Any]]: | |
| today = today_in_config_timezone() | |
| categories = ", ".join(list_supported_categories()) | |
| system = f""" | |
| Eres un extractor de productos desde imagenes de boletas o tickets. | |
| Debes devolver solo una LISTA JSON. | |
| Fecha actual de referencia: {today} | |
| Categorias permitidas: {categories} | |
| Cada elemento debe seguir este esquema: | |
| {{ | |
| "producto": "string", | |
| "precio": 0, | |
| "cantidad": 0, | |
| "unidad": "unidad", | |
| "fechaCaducidad": "", | |
| "fechaIngreso": "{today}", | |
| "fechaProduccion": "{today}", | |
| "categoria": "una_categoria_valida", | |
| "caducidadEstimada": false, | |
| "notas": "string", | |
| "fuente": "{source}" | |
| }} | |
| Reglas: | |
| - Extrae todos los productos identificables de la compra. | |
| - Prioriza precision sobre cantidad: si una linea es dudosa o ilegible, omitela. | |
| - No inventes marcas, sabores ni palabras que no se vean con suficiente claridad. | |
| - Si el nombre exacto no es legible, devuelve un nombre mas generico y corto en vez de alucinar texto. | |
| - Si varias lineas parecen ser la misma referencia repetida por la misma boleta o por otra captura del mismo ticket, usa un nombre consistente. | |
| - Si la boleta muestra fecha de compra, usala como fechaIngreso para todos los items; si no, usa la fecha actual de referencia. | |
| - Si ves una fecha de vencimiento real asociada a un producto, usala y deja caducidadEstimada en false. | |
| - Si no ves fecha de vencimiento real, deja fechaCaducidad vacia, asigna una categoria valida y deja caducidadEstimada en false; el backend estimara la fecha. | |
| - Si no ves fecha de produccion, usa la fecha actual de referencia. | |
| - Si la linea muestra peso, volumen o multiplicador como kg, g, ml, L, x2, x3, usa esa cantidad y unidad. | |
| - Si no puedes saber cantidad exacta, usa 1 solo cuando el item realmente parezca una unidad individual. | |
| - Usa "unidad" solo si no se ve una unidad mejor. | |
| - Si no puedes extraer un producto con nombre y precio razonables, omitelo. | |
| - No devuelvas texto fuera del JSON. | |
| """ | |
| prompt = """ | |
| Analiza esta imagen de boleta o ticket y devuelve una lista JSON con productos registrables. | |
| Si la imagen parece ser una captura parcial de un ticket largo, extrae solo las lineas claramente visibles. | |
| No intentes completar texto cortado. | |
| """ | |
| text = await call_claude_with_image(system, prompt, image_bytes, media_type=media_type, max_tokens=2800) | |
| return extract_json_list(text) | |
| async def extract_consumption(raw_text: str, source: str) -> dict[str, Any]: | |
| today = today_in_config_timezone() | |
| system = f""" | |
| Eres un extractor de consumo de inventario para texto libre y audio transcrito. | |
| Devuelve solo JSON exacto. | |
| Fecha actual de referencia: {today} | |
| Esquema: | |
| {{ | |
| "producto": "string", | |
| "cantidad": 0, | |
| "unidad": "unidad", | |
| "notas": "string", | |
| "fuente": "{source}" | |
| }} | |
| Reglas: | |
| - Entiende cantidades habladas en espanol. | |
| - Si el usuario agrega contexto como "para desayuno" o "use medio kilo", mandalo a notas si no forma parte del nombre. | |
| - Elige solo un producto principal. | |
| """ | |
| text = await call_claude(system, f"Texto de consumo:\n{raw_text}\n\nDevuelve solo el JSON final.") | |
| return extract_json(text) | |
| async def route_telegram_text(raw_text: str, memory_context: str = "") -> dict[str, Any]: | |
| system = """ | |
| Eres un router de intenciones para un bot de inventario domestico por Telegram. | |
| Debes decidir que quiere hacer el usuario y devolver solo JSON exacto. | |
| Esquema: | |
| { | |
| "accion": "registrar|consumir|stock|buscar|vencimientos|diagrama|memoria|ayuda", | |
| "texto": "texto util para la accion", | |
| "producto": "nombre corto del producto si aplica", | |
| "pregunta": "pregunta para consulta si aplica", | |
| "instruccion": "instruccion de diagrama si aplica" | |
| } | |
| Reglas: | |
| - "registrar" cuando el usuario quiere ingresar compras, productos nuevos, lotes, fotos, boletas o agregar inventario. | |
| - "consumir" cuando el usuario dice que uso, gasto, consumio o queda menos de un producto. | |
| - "stock" cuando pregunta cuanto queda, cuanto hay, si hay stock o disponibilidad de un producto. | |
| - "vencimientos" cuando pregunta que vence pronto, proximos a vencer o caducidades. | |
| - "diagrama" cuando pide un cuadro, flujo, grafico o diagrama. | |
| - "memoria" cuando pregunta que dijo antes, que hablaron, que recuerdas o hace referencia al contexto reciente. | |
| - "buscar" para preguntas generales sobre los registros, comparaciones o analisis. | |
| - "ayuda" solo si el texto es saludo, ambiguo o no alcanza para decidir. | |
| - Conserva en "texto" el mensaje original limpiado. | |
| - Si aplica, llena "producto", "pregunta" o "instruccion". | |
| """ | |
| user_prompt = f"Mensaje del usuario:\n{raw_text}\n" | |
| if memory_context.strip(): | |
| user_prompt += f"\nContexto del chat:\n{memory_context}\n" | |
| user_prompt += "\nDevuelve solo el JSON del routing." | |
| text = await call_claude(system, user_prompt, max_tokens=400) | |
| return extract_json(text) | |
| def _records_to_context(records: list[dict[str, Any]]) -> str: | |
| return "\n".join( | |
| f"{index + 1}. producto={r.get('producto')}; precio={r.get('precio')}; cantidad={r.get('cantidad')}; unidad={r.get('unidad')}; stockActual={r.get('stockActual')}; fechaCaducidad={r.get('fechaCaducidad')}; consumidoTotal={r.get('consumidoTotal')}" | |
| for index, r in enumerate(records) | |
| ) | |
| async def answer_question(question: str, records: list[dict[str, Any]]) -> str: | |
| system = """ | |
| Eres un asistente de inventario domestico. | |
| Responde solo con informacion sustentada por los registros. | |
| Si los datos no alcanzan, dilo claramente. | |
| """ | |
| return await call_claude(system, f"Pregunta:\n{question}\n\nRegistros:\n{_records_to_context(records)}") | |
| async def build_mermaid(instruction: str, records: list[dict[str, Any]]) -> str: | |
| system = """ | |
| Genera solo codigo Mermaid valido. | |
| No uses markdown. | |
| """ | |
| return await call_claude( | |
| system, | |
| f"Instruccion:\n{instruction}\n\nRegistros:\n{_records_to_context(records)}", | |
| ) | |
| def _safe_label(label: str) -> str: | |
| return re.sub(r"[^a-zA-Z0-9 ]+", "", label).strip()[:18] or "item" | |
| def build_inventory_chart(instruction: str, records: list[dict[str, Any]]) -> str: | |
| lowered = instruction.lower() | |
| if "consumo" in lowered or "gast" in lowered: | |
| ordered = sorted(records, key=lambda item: float(item.get("consumidoTotal", 0) or 0), reverse=True)[:8] | |
| title = "Consumo acumulado" | |
| series_name = "Consumido" | |
| values = [float(item.get("consumidoTotal", 0) or 0) for item in ordered] | |
| elif "vence" in lowered or "caduc" in lowered: | |
| ordered = sorted( | |
| [item for item in records if item.get("fechaCaducidad") not in ("", NO_EXPIRY_DATE)], | |
| key=lambda item: item.get("fechaCaducidad", ""), | |
| )[:8] | |
| title = "Productos por vencer" | |
| series_name = "Stock" | |
| values = [float(item.get("stockActual", 0) or 0) for item in ordered] | |
| else: | |
| ordered = sorted(records, key=lambda item: float(item.get("stockActual", 0) or 0), reverse=True)[:8] | |
| title = "Stock actual" | |
| series_name = "Stock" | |
| values = [float(item.get("stockActual", 0) or 0) for item in ordered] | |
| if not ordered: | |
| return "xychart-beta\ntitle \"Sin datos\"\nx-axis []\nbar []" | |
| labels = ", ".join(f"\"{_safe_label(item.get('producto', 'item'))}\"" for item in ordered) | |
| bar_values = ", ".join(str(round(value, 2)) for value in values) | |
| return ( | |
| "xychart-beta\n" | |
| f"title \"{title}\"\n" | |
| f"x-axis [{labels}]\n" | |
| f"bar \"{series_name}\" [{bar_values}]" | |
| ) | |