from __future__ import annotations import json import re import base64 from typing import Any from .datetime_utils import NO_EXPIRY_DATE, today_in_config_timezone import httpx from ..config import settings from .expiry_rules import list_supported_categories def extract_json(text: str) -> dict[str, Any]: fenced = re.search(r"```json\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) if fenced: parsed = json.loads(fenced.group(1)) if isinstance(parsed, list): if parsed and isinstance(parsed[0], dict): return parsed[0] raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") return parsed inline = re.search(r"\{.*\}", text, re.DOTALL) if inline: parsed = json.loads(inline.group(0)) if isinstance(parsed, list): if parsed and isinstance(parsed[0], dict): return parsed[0] raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") return parsed list_inline = re.search(r"\[.*\]", text, re.DOTALL) if list_inline: parsed = json.loads(list_inline.group(0)) if parsed and isinstance(parsed[0], dict): return parsed[0] raise ValueError("Claude devolvio una lista JSON invalida para este flujo.") raise ValueError("No se encontro JSON valido en la respuesta de Claude.") def extract_json_list(text: str) -> list[dict[str, Any]]: fenced = re.search(r"```json\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) if fenced: parsed = json.loads(fenced.group(1)) else: list_inline = re.search(r"\[.*\]", text, re.DOTALL) if list_inline: parsed = json.loads(list_inline.group(0)) else: parsed = extract_json(text) if isinstance(parsed, dict): return [parsed] if isinstance(parsed, list): items = [item for item in parsed if isinstance(item, dict)] if items: return items raise ValueError("No se encontro una lista JSON valida en la respuesta de Claude.") async def call_claude(system: str, user_content: str, max_tokens: int = 1200) -> str: candidate_models = [] for model in [ settings.anthropic_model, "claude-haiku-4-5", "claude-3-haiku-20240307", ]: if model and model not in candidate_models: candidate_models.append(model) async with httpx.AsyncClient(timeout=90) as client: last_error = None for model in candidate_models: response = await client.post( "https://api.anthropic.com/v1/messages", headers={ "content-type": "application/json", "x-api-key": settings.anthropic_api_key, "anthropic-version": "2023-06-01", }, json={ "model": model, "max_tokens": max_tokens, "system": system, "messages": [{"role": "user", "content": user_content}], }, ) if response.is_success: payload = response.json() return "\n".join( part["text"] for part in payload.get("content", []) if part.get("type") == "text" ) body = response.text last_error = f"Anthropic devolvio {response.status_code} con modelo {model}: {body}" if response.status_code not in (400, 404): raise ValueError(last_error) raise ValueError(last_error or "No se pudo consultar Anthropic.") async def call_claude_with_image( system: str, prompt: str, image_bytes: bytes, media_type: str = "image/jpeg", max_tokens: int = 2400, ) -> str: candidate_models = [] for model in [settings.anthropic_model, "claude-haiku-4-5", "claude-haiku-4-5-20251001"]: if model and model not in candidate_models: candidate_models.append(model) image_b64 = base64.b64encode(image_bytes).decode("utf-8") async with httpx.AsyncClient(timeout=120) as client: last_error = None for model in candidate_models: response = await client.post( "https://api.anthropic.com/v1/messages", headers={ "content-type": "application/json", "x-api-key": settings.anthropic_api_key, "anthropic-version": "2023-06-01", }, json={ "model": model, "max_tokens": max_tokens, "system": system, "messages": [ { "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": image_b64, }, }, {"type": "text", "text": prompt}, ], } ], }, ) if response.is_success: payload = response.json() return "\n".join( part["text"] for part in payload.get("content", []) if part.get("type") == "text" ) last_error = f"Anthropic vision devolvio {response.status_code}: {response.text}" if response.status_code not in (400, 404): raise ValueError(last_error) raise ValueError(last_error or "No se pudo consultar Anthropic con imagen.") async def extract_product(raw_text: str, source: str) -> dict[str, Any]: today = today_in_config_timezone() system = f""" Eres un extractor de inventario domestico para mensajes de voz y texto libre. Tu trabajo es leer transcripciones largas, desordenadas o conversacionales y extraer UN solo registro de producto. Debes priorizar el producto principal que claramente se esta intentando registrar. Si el texto tiene relleno, comentarios o contexto extra, conservalo resumido en "notas". Devuelve solo JSON exacto. Fecha actual de referencia: {today} Esquema: {{ "producto": "string", "precio": 0, "cantidad": 0, "unidad": "unidad", "fechaCaducidad": "YYYY-MM-DD", "fechaIngreso": "YYYY-MM-DD", "fechaProduccion": "YYYY-MM-DD", "notas": "string", "fuente": "{source}" }} Reglas: - Convierte fechas a YYYY-MM-DD. - Entiende fechas en espanol, por ejemplo "7 de agosto de 2026". - Si el usuario dice "hoy", usa la fecha actual de referencia. - "fechaIngreso" es la fecha en la que el producto entro a la casa o inventario. - "fechaProduccion" es la fecha de fabricacion, emision o produccion. - Si no se menciona fechaProduccion, usa la fecha actual de referencia. - Si el producto no caduca o el usuario dice "no vence", "no caduca" o equivalente, usa "{NO_EXPIRY_DATE}" en fechaCaducidad. - Entiende cantidades habladas como "cinco", "medio", "una", "dos", "media docena" cuando sea razonable. - Entiende monedas y precios hablados como "veintiuno con noventa", "21.9 soles", "cuatro con veinte". - Normaliza unidades comunes: kg, kilo, kilos, g, gramo, gramos, litro, litros, ml, unidad, unidades, paquete, paquetes, lata, latas, botella, botellas. - Si hay varias frases largas, elige el producto que tenga mas datos completos. - Si falta fechaIngreso, usa la fecha actual de referencia. - Si un campo no aparece, usa 0 o cadena vacia. - No inventes datos no mencionados explicitamente o inferibles de forma directa. """ text = await call_claude( system, f"Texto a estructurar:\n{raw_text}\n\nDevuelve solo el JSON final del producto principal.", ) return extract_json(text) async def extract_products_batch(raw_text: str, source: str) -> list[dict[str, Any]]: today = today_in_config_timezone() system = f""" Eres un extractor de inventario domestico para audios largos de compras. Debes leer una transcripcion larga y devolver una LISTA JSON de productos. No devuelvas explicaciones. Solo JSON. Fecha actual de referencia: {today} Cada elemento debe seguir este esquema: {{ "producto": "string", "precio": 0, "cantidad": 0, "unidad": "unidad", "fechaCaducidad": "YYYY-MM-DD", "fechaIngreso": "YYYY-MM-DD", "fechaProduccion": "YYYY-MM-DD", "notas": "string", "fuente": "{source}" }} Reglas: - Devuelve una lista con todos los productos identificables de la transcripcion. - Si un producto no tiene suficientes datos minimos para registro, omitelo. - Datos minimos: producto, cantidad, unidad, fechaIngreso, fechaProduccion y fechaCaducidad. - Convierte fechas en espanol a YYYY-MM-DD. - Si el usuario dice "hoy", usa la fecha actual de referencia. - Si no se menciona fechaIngreso, usa la fecha actual de referencia. - Si no se menciona fechaProduccion, usa la fecha actual de referencia. - Entiende cantidades habladas como "cinco", "medio", "una", "dos", "media docena" cuando sea razonable. - Entiende precios hablados como "veintiuno con noventa", "30 con 16", "7 soles con 62". - Si se menciona "no vence", "no caduca" o equivalente, usa "{NO_EXPIRY_DATE}" en fechaCaducidad. - Si hay incertidumbre tipo "ponle", "aproximadamente", "más o menos", extraela igualmente y guarda esa aclaracion breve en notas. - Normaliza unidades comunes: kg, kilo, kilos, g, gramo, gramos, litro, litros, ml, unidad, unidades, paquete, paquetes, lata, latas, botella, botellas. - No inventes productos ni fechas no mencionadas. """ text = await call_claude( system, f"Transcripcion larga:\n{raw_text}\n\nDevuelve solo una lista JSON con todos los productos registrables.", max_tokens=2400, ) return extract_json_list(text) async def extract_products_from_receipt_image( image_bytes: bytes, source: str, media_type: str = "image/jpeg", ) -> list[dict[str, Any]]: today = today_in_config_timezone() categories = ", ".join(list_supported_categories()) system = f""" Eres un extractor de productos desde imagenes de boletas o tickets. Debes devolver solo una LISTA JSON. Fecha actual de referencia: {today} Categorias permitidas: {categories} Cada elemento debe seguir este esquema: {{ "producto": "string", "precio": 0, "cantidad": 0, "unidad": "unidad", "fechaCaducidad": "", "fechaIngreso": "{today}", "fechaProduccion": "{today}", "categoria": "una_categoria_valida", "caducidadEstimada": false, "notas": "string", "fuente": "{source}" }} Reglas: - Extrae todos los productos identificables de la compra. - Prioriza precision sobre cantidad: si una linea es dudosa o ilegible, omitela. - No inventes marcas, sabores ni palabras que no se vean con suficiente claridad. - Si el nombre exacto no es legible, devuelve un nombre mas generico y corto en vez de alucinar texto. - Si varias lineas parecen ser la misma referencia repetida por la misma boleta o por otra captura del mismo ticket, usa un nombre consistente. - Si la boleta muestra fecha de compra, usala como fechaIngreso para todos los items; si no, usa la fecha actual de referencia. - Si ves una fecha de vencimiento real asociada a un producto, usala y deja caducidadEstimada en false. - Si no ves fecha de vencimiento real, deja fechaCaducidad vacia, asigna una categoria valida y deja caducidadEstimada en false; el backend estimara la fecha. - Si no ves fecha de produccion, usa la fecha actual de referencia. - Si la linea muestra peso, volumen o multiplicador como kg, g, ml, L, x2, x3, usa esa cantidad y unidad. - Si no puedes saber cantidad exacta, usa 1 solo cuando el item realmente parezca una unidad individual. - Usa "unidad" solo si no se ve una unidad mejor. - Si no puedes extraer un producto con nombre y precio razonables, omitelo. - No devuelvas texto fuera del JSON. """ prompt = """ Analiza esta imagen de boleta o ticket y devuelve una lista JSON con productos registrables. Si la imagen parece ser una captura parcial de un ticket largo, extrae solo las lineas claramente visibles. No intentes completar texto cortado. """ text = await call_claude_with_image(system, prompt, image_bytes, media_type=media_type, max_tokens=2800) return extract_json_list(text) async def extract_consumption(raw_text: str, source: str) -> dict[str, Any]: today = today_in_config_timezone() system = f""" Eres un extractor de consumo de inventario para texto libre y audio transcrito. Devuelve solo JSON exacto. Fecha actual de referencia: {today} Esquema: {{ "producto": "string", "cantidad": 0, "unidad": "unidad", "notas": "string", "fuente": "{source}" }} Reglas: - Entiende cantidades habladas en espanol. - Si el usuario agrega contexto como "para desayuno" o "use medio kilo", mandalo a notas si no forma parte del nombre. - Elige solo un producto principal. """ text = await call_claude(system, f"Texto de consumo:\n{raw_text}\n\nDevuelve solo el JSON final.") return extract_json(text) async def route_telegram_text(raw_text: str, memory_context: str = "") -> dict[str, Any]: system = """ Eres un router de intenciones para un bot de inventario domestico por Telegram. Debes decidir que quiere hacer el usuario y devolver solo JSON exacto. Esquema: { "accion": "registrar|consumir|stock|buscar|vencimientos|diagrama|memoria|ayuda", "texto": "texto util para la accion", "producto": "nombre corto del producto si aplica", "pregunta": "pregunta para consulta si aplica", "instruccion": "instruccion de diagrama si aplica" } Reglas: - "registrar" cuando el usuario quiere ingresar compras, productos nuevos, lotes, fotos, boletas o agregar inventario. - "consumir" cuando el usuario dice que uso, gasto, consumio o queda menos de un producto. - "stock" cuando pregunta cuanto queda, cuanto hay, si hay stock o disponibilidad de un producto. - "vencimientos" cuando pregunta que vence pronto, proximos a vencer o caducidades. - "diagrama" cuando pide un cuadro, flujo, grafico o diagrama. - "memoria" cuando pregunta que dijo antes, que hablaron, que recuerdas o hace referencia al contexto reciente. - "buscar" para preguntas generales sobre los registros, comparaciones o analisis. - "ayuda" solo si el texto es saludo, ambiguo o no alcanza para decidir. - Conserva en "texto" el mensaje original limpiado. - Si aplica, llena "producto", "pregunta" o "instruccion". """ user_prompt = f"Mensaje del usuario:\n{raw_text}\n" if memory_context.strip(): user_prompt += f"\nContexto del chat:\n{memory_context}\n" user_prompt += "\nDevuelve solo el JSON del routing." text = await call_claude(system, user_prompt, max_tokens=400) return extract_json(text) def _records_to_context(records: list[dict[str, Any]]) -> str: return "\n".join( f"{index + 1}. producto={r.get('producto')}; precio={r.get('precio')}; cantidad={r.get('cantidad')}; unidad={r.get('unidad')}; stockActual={r.get('stockActual')}; fechaCaducidad={r.get('fechaCaducidad')}; consumidoTotal={r.get('consumidoTotal')}" for index, r in enumerate(records) ) async def answer_question(question: str, records: list[dict[str, Any]]) -> str: system = """ Eres un asistente de inventario domestico. Responde solo con informacion sustentada por los registros. Si los datos no alcanzan, dilo claramente. """ return await call_claude(system, f"Pregunta:\n{question}\n\nRegistros:\n{_records_to_context(records)}") async def build_mermaid(instruction: str, records: list[dict[str, Any]]) -> str: system = """ Genera solo codigo Mermaid valido. No uses markdown. """ return await call_claude( system, f"Instruccion:\n{instruction}\n\nRegistros:\n{_records_to_context(records)}", ) def _safe_label(label: str) -> str: return re.sub(r"[^a-zA-Z0-9 ]+", "", label).strip()[:18] or "item" def build_inventory_chart(instruction: str, records: list[dict[str, Any]]) -> str: lowered = instruction.lower() if "consumo" in lowered or "gast" in lowered: ordered = sorted(records, key=lambda item: float(item.get("consumidoTotal", 0) or 0), reverse=True)[:8] title = "Consumo acumulado" series_name = "Consumido" values = [float(item.get("consumidoTotal", 0) or 0) for item in ordered] elif "vence" in lowered or "caduc" in lowered: ordered = sorted( [item for item in records if item.get("fechaCaducidad") not in ("", NO_EXPIRY_DATE)], key=lambda item: item.get("fechaCaducidad", ""), )[:8] title = "Productos por vencer" series_name = "Stock" values = [float(item.get("stockActual", 0) or 0) for item in ordered] else: ordered = sorted(records, key=lambda item: float(item.get("stockActual", 0) or 0), reverse=True)[:8] title = "Stock actual" series_name = "Stock" values = [float(item.get("stockActual", 0) or 0) for item in ordered] if not ordered: return "xychart-beta\ntitle \"Sin datos\"\nx-axis []\nbar []" labels = ", ".join(f"\"{_safe_label(item.get('producto', 'item'))}\"" for item in ordered) bar_values = ", ".join(str(round(value, 2)) for value in values) return ( "xychart-beta\n" f"title \"{title}\"\n" f"x-axis [{labels}]\n" f"bar \"{series_name}\" [{bar_values}]" )