#*************************************************************************** # Mori (tech-only) — Streamlit App sin sidebar ni social, con RAG opcional #*************************************************************************** import os, sys, warnings, json, joblib, random, re, unicodedata, uuid, torch, csv import numpy as np os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" import streamlit as st import datetime as dt from pathlib import Path import torch import numpy as np from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForSequenceClassification from huggingface_hub import hf_hub_download from sentence_transformers import SentenceTransformer # RAG embeddings # ========================= # Configuración general # ========================= HF_TOKEN = os.environ.get("HF_TOKEN") # Token privado (colócalo en Secrets o variable de entorno) #*************************************************************************** # Sidebar controls for generation params #*************************************************************************** def sidebar_params(): with st.sidebar: st.title("🎮 Adjustments (T5-Base)") ss = st.session_state # Defaults (solo 1ª vez) # Estado inicial: ocultar ajustes avanzados ss = st.session_state if "show_llm_controls" not in ss: ss.show_llm_controls = False ss.setdefault("persona", "Normal") ss.setdefault("mode", "beam") # 'beam' | 'sampling' ss.setdefault("max_new", 128) ss.setdefault("min_tok", 16) ss.setdefault("no_repeat", 3) ss.setdefault("num_beams", 4) ss.setdefault("length_penalty", 1.0) ss.setdefault("temperature", 0.7) ss.setdefault("top_p", 0.9) ss.setdefault("repetition_penalty", 1.0) ss.setdefault("show_llm_controls", True) # Toggle principal # ---------------------------- # Personalidad (presets) # ---------------------------- st.header("💡 Predefined Personalities") c1, c2 = st.columns(2) with c1: if st.button("Normal 🧐", use_container_width=True): ss.update({ "persona": "Normal", "mode": "beam", "num_beams": 1, "max_new": 92, "min_tok": 32, "no_repeat": 3, "length_penalty": .3, "temperature": 0.4, "top_p": 0.9, "repetition_penalty": .4, }) st.rerun() with c2: if st.button("Enthusiastic 😃", use_container_width=True): ss.update({ "persona": "Enthusiastic", # <- corregido "mode": "sampling", "max_new": 192, "min_tok": 48, "no_repeat": 3, "temperature": .8, "top_p": 0.95, "repetition_penalty": 1.0, }) st.rerun() st.caption(f"Selected Personality: **{ss.persona}**") # ---------------------------- # Botón para mostrar/ocultar parámetros # ---------------------------- if st.button(("🔼 Hide" if ss.show_llm_controls else "🔽 Show") + " Advanced Settings"): ss.show_llm_controls = not ss.show_llm_controls st.rerun() # ---------------------------- # Controles del modelo (sliders, estrategia, etc.) # ---------------------------- if ss.show_llm_controls: st.header("⚙️ Manual Adjustments") st.subheader("📝 Text Generation") picked = st.radio( "Strategy", ["Beam search (stable)", "Sampling (creative)"], index=0 if ss.mode == "beam" else 1, help="https://huggingface.co/docs/transformers/generation_strategies" ) ss.mode = "beam" if picked.startswith("Beam") else "sampling" st.subheader("🔧 LLM text generation parameters") ss.max_new = st.slider( "max_new_tokens", 16, 256, int(ss.max_new), step=8, help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) ss.min_tok = st.slider( "min_tokens", 0, int(ss.max_new), int(ss.min_tok), help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) ss.no_repeat = st.slider( "no_repeat_ngram_size", 0, 6, int(ss.no_repeat), help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) # Subcontroles según modo if ss.mode == "beam": ss.num_beams = st.slider( "num_beams", 2, 8, int(ss.num_beams), help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) ss.length_penalty = st.slider( "length_penalty", 0.0, 2.0, float(ss.length_penalty), step=0.1, help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) else: ss.temperature = st.slider( "temperature", 0.1, 1.5, float(ss.temperature), step=0.05, help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) ss.top_p = st.slider( "top_p", 0.5, 1.0, float(ss.top_p), step=0.01, help="https://huggingface.co/docs/transformers/main_classes/text_generation" ) if "last_prompt" in st.session_state and st.session_state["last_prompt"]: with st.expander("Show generated prompt"): st.text_area( "Prompt actual:", st.session_state["last_prompt"], height=200, disabled=True ) else: st.caption("👉 No prompt is available yet.") # ---------------------------- # Construir diccionario de parámetros # ---------------------------- params = { "persona": ss.persona, "mode": ss.mode, "max_new_tokens": int(ss.max_new), "min_tokens": int(ss.min_tok), "no_repeat_ngram_size": int(ss.no_repeat), "repetition_penalty": float(ss.repetition_penalty), } if ss.mode == "beam": params.update({ "num_beams": int(ss.num_beams), "length_penalty": float(ss.length_penalty), }) else: params.update({ "temperature": float(ss.temperature), "top_p": float(ss.top_p), }) return params #*************************************************************************** # Functions #*************************************************************************** def truncate_sentences(text: str, max_sentences: int = 4) -> str: _SENT_SPLIT = re.compile(r'(?<=[\.\!\?…])\s+') s = text.strip() if not s: return s parts = _SENT_SPLIT.split(s) cut = " ".join(parts[:max_sentences]).strip() if cut and cut[-1] not in ".!?…": cut += "." return cut def _load_json_safe(path: Path, fallback: dict) -> dict: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return fallback # Function to clean the question field def limpiar_input(): st.session_state["entrada"] = "" # ✅ Corrige la ruta correctamente desde Scripts hacia Models def get_model_path(folder_name): return Path("Models") / folder_name # Function to save user interaction def saving_interaction(question, response, context, user_id): ''' inputs: question --> User input question response --> Assistant response to the user question context --> Context related to the user input, found by the trained classifier user_id --> ID for the current user (Unique ID per session) ''' timestamp = dt.datetime.now().isoformat() stats_dir = Path("Statistics") stats_dir.mkdir(parents=True, exist_ok=True) archivo_csv = stats_dir / "conversaciones_log.csv" existe_csv = archivo_csv.exists() with open(archivo_csv, mode="a", encoding="utf-8", newline="") as f_csv: writer = csv.writer(f_csv) if not existe_csv: writer.writerow(["timestamp", "user_id", "contexto", "pregunta", "respuesta"]) writer.writerow([timestamp, user_id, context, question, response]) archivo_jsonl = stats_dir / "conversaciones_log.jsonl" with open(archivo_jsonl, mode="a", encoding="utf-8") as f_jsonl: registro = { "timestamp": timestamp, "user_id": user_id, "context": context, "pregunta": question, "respuesta": response} f_jsonl.write(json.dumps(registro, ensure_ascii=False) + "\n") # Function to load models within the huggingface repositories space @st.cache_resource def load_model(path_str): path = Path(path_str).resolve() tokenizer = AutoTokenizer.from_pretrained(path, local_files_only=True) model = AutoModelForSeq2SeqLM.from_pretrained(path, local_files_only=True) return model, tokenizer #------------------------------------------------------------------------- # Function to correct Spanish sentences' punctuation and missing characters #------------------------------------------------------------------------- def polish_spanish(s: str) -> str: s = unicodedata.normalize("NFC", s).strip() s = re.sub(r'\s*[\[\(]\s*Assistant\s+(?:Social|T[eé]nico|T[eé]cnico)\s*[\]\)]\s*', '', s, flags=re.I) fixes = [ (r'(?i)(^|\W)T\s+puedes(?P
[^\w]|$)', r'\1Tú puedes\g
'), (r'(?i)(^|\W)T\s+(ya|eres|estas|estás|tienes|puedes)\b', r'\1Tú \2'), (r'(?i)\bclaro que s(?:i|í)?\b(?P
[,.\!?…])?', r'Claro que sí\g
'),
(r'(?i)(^|\s)si,', r'\1Sí,'),
(r'(?i)(\beso\s+)s(\s+est[áa]\b)', r'\1sí\2'),
(r'(?i)(^|[\s,;:])s(\s+es\b)', r'\1sí\2'),
(r'(?i)\btiles\b', 'útiles'),
(r'(?i)\butiles\b', 'útiles'),
(r'(?i)\butil\b', 'útil'),
(r'(?i)\baqui\b', 'aquí'),
(r'(?i)\baqu\b(?=\s+estoy\b)', 'aquí'),
(r'(?i)\balgn\b', 'algún'),
(r'(?i)\balgun\b', 'algún'),
(r'(?i)\bAnimo\b', 'Ánimo'),
(r'(?i)\bcario\b', 'cariño'),
(r'(?i)\baprendisaje\b', 'aprendizaje'),
(r'(?i)\bmanana\b', 'mañana'),
(r'(?i)\bmaana\b', 'mañana'),
(r'(?i)\benergia\b', 'energía'),
(r'(?i)\benerga\b', 'energía'),
(r'(?i)\bextrano\b', 'extraño'),
(r'(?i)\bextrana\b', 'extraña'),
(r'(?i)\bextranar\b', 'extrañar'),
(r'(?i)\bextranarte\b', 'extrañarte'),
(r'(?i)\bextranas\b', 'extrañas'),
(r'(?i)\bextranos\b', 'extraños'),
(r'(?i)\baqu\b', 'aquí'),
(r'(?i)\baqui\b', 'aquí'),
(r'(?i)\bestare\b', 'estaré'),
(r'(?i)\bclarn\b', 'clarín'),
(r'(?i)\bclarin\b', 'clarín'),
(r'(?i)\bclar[íi]n\s+cornetas\b', 'clarín cornetas'),
(r'(?i)(^|\s)s([,.;:!?])', r'\1Sí\2'),
(r'(?i)\bfutbol\b', 'fútbol'),
(r'(?i)(^|\s)as(\s+se\b)', r'\1Así\2'),
(r'(?i)(^|\s)s(\s+orientarte\b)', r'\1sí\2'),
(r'(?i)\bbuen dia\b', 'buen día'),
(r'(?i)\bgran dia\b', 'gran día'),
(r'(?i)\bdias\b', 'días'),
(r'(?i)\bdia\b', 'día'),
(r'(?i)\bgran da\b', 'gran día'),
(r'(?i)\bacompa?a(r|rte|do|da|dos|das)?\b', r'acompaña\1'),
(r'(?i)(^|\s)as([,.;:!?]|\s|$)', r'\1así\2'),
(r'(?i)(^|\s)S lo se\b', r'\1Sí lo sé'),
(r'(?i)(^|\s)S lo sé\b', r'\1Sí lo sé'),
(r'(?i)\bcudese\b', 'cuídese'),
(r'(?i)\bpequeo\b', 'pequeño'),
(r'(?i)\bpequea\b', 'pequeña'),
(r'(?i)\bpequeos\b', 'pequeños'),
(r'(?i)\bpequeas\b', 'pequeñas'),
(r'(?i)\bunico\b', 'único'),
(r'(?i)\bunica\b', 'única'),
(r'(?i)\bunicos\b', 'únicos'),
(r'(?i)\bunicas\b', 'únicas'),
(r'(?i)\bnico\b', 'único'),
(r'(?i)\bnica\b', 'única'),
(r'(?i)\bnicos\b', 'únicos'),
(r'(?i)\bnicas\b', 'únicas'),
(r'(?i)\bestadstico\b', 'estadístico'),
(r'(?i)\bestadstica\b', 'estadística'),
(r'(?i)\bestadsticos\b', 'estadísticos'),
(r'(?i)\bestadsticas\b', 'estadísticas'),
(r'(?i)\bcudate\b', 'cuídate'),
(r'(?i)\bcuidate\b', 'cuídate'),
(r'(?i)\bcuidese\b', 'cuídese'),
(r'(?i)\bcudese\b', 'cuídese'),
(r'(?i)\bcuidense\b', 'cuídense'),
(r'(?i)\bcudense\b', 'cuídense'),
(r'(?i)\bgracias por confiar en m\b', 'gracias por confiar en mí'),
(r'(?i)\bcada dia\b', 'cada día'),
(r'(?i)\bcada da\b', 'cada día'),
(r'(?i)\bsegun\b', 'según'),
(r'(?i)\bcaracteristica(s)?\b', r'característica\1'),
(r'(?i)\bcaracterstica(s)?\b', r'característica\1'),
(r'(?i)\b([a-záéíóúñ]+)cion\b', r'\1ción'),
(r'(?i)\bdeterminacio\b', 'determinación'),
]
for pat, rep in fixes:
s = re.sub(pat, rep, s)
s = re.sub(r'(?i)^eso es todo!(?P(^|[\.!\…]\s+))(?P
", unsafe_allow_html=True)
st.caption("✏️ Type **'salir'** to exit.")
# 🔁 Limpieza segura antes del formulario
if st.session_state.pop("_clear_entrada", False):
if "entrada" in st.session_state:
del st.session_state["entrada"]
# 🧠 Flash de respuesta (la guardamos, pero la mostraremos después del form)
_flash = st.session_state.pop("_flash_response", None)
with st.form("formulario_assistant"):
user_question = st.text_area("📝 Escribe tu pregunta aquí", key="entrada", height=100)
submitted = st.form_submit_button("Responder")
if submitted:
if not user_question:
st.info("Chatbot: ¿Podrías repetir eso? No entendí bien 😅")
else:
response, context = contextual_asnwer(
user_question, label_classes, context_model, cont_tok,
tec_model, tec_tok, soc_model, soc_tok, device,
gen_params=GEN_PARAMS, block_web=True,
)
# 🧠 Guarda historial
hora_actual = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.session_state.historial.append(("Tú", user_question, hora_actual))
hora_actual = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.session_state.historial.append(("Assistant", response, hora_actual))
# 💾 Guarda conversación
saving_interaction(user_question, response, context, st.session_state["user_id"])
# 🟩 Guarda respuesta para mostrar después del rerun
st.session_state["_flash_response"] = response
# 🧼 Limpieza del textarea en el próximo ciclo
st.session_state["_clear_entrada"] = True
# ♻️ Forzar refresh (sidebar verá el nuevo prompt)
st.rerun()
# -----------------------------------------------------------
# 💬 Mostrar la respuesta actual (flash) justo aquí ↓↓↓
# -----------------------------------------------------------
if _flash:
st.success(_flash)
# Mostrar último mensaje (opcional, arriba de todo)
#if st.session_state.get("just_generated"):
# if st.session_state["last_response"]:
# st.success(st.session_state["last_response"])
# st.session_state["just_generated"] = False
# ... formulario y lógica de respuesta ...
# 🔁 Historial con estilo chat y contenedor con scroll
if st.session_state.historial:
st.markdown("---")
# 💾 Botón de descarga arriba del historial
lineas = []
for msg in reversed(st.session_state.historial):
if len(msg) == 3:
autor, texto, hora = msg
lineas.append(f"[{hora}] {autor}: {texto}")
else:
autor, texto = msg
lineas.append(f"{autor}: {texto}")
texto_chat = "\n\n".join(lineas)
st.download_button(
label="💾 Descargar conversación como .txt",
data=texto_chat,
file_name="conversacion_assistant.txt",
mime="text/plain",
use_container_width=True
)
# 🪟 Contenedor con scroll y burbujas
st.markdown(
"""