# -*- coding: utf-8 -*- """Untitled19.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1BHGLAUd1RoMirSlz-R_eNGfVcNdcl8p8 """ # INSTALACIÓN DE DEPENDENCIAS !pip install -q transformers sentence-transformers torch chromadb pypdf2 gradio !pip install -q pypdf # IMPORTACIÓN DE LIBRERÍAS from pypdf import PdfReader from sentence_transformers import SentenceTransformer import chromadb from transformers import pipeline from google.colab import files import gradio as gr import uuid import re import os # CARGA DE ARCHIVOS PDF print("Selecciona los archivos PDF del reglamento (Pregrado y Posgrado)...") uploaded = files.upload() print("\nArchivos cargados correctamente:") for fn in uploaded.keys(): print(f" • {fn}") # Guardar PDFs en Colab for fn, data in uploaded.items(): with open(fn, "wb") as f: f.write(data) print("\nPDF guardados correctamente.") # EXTRACCIÓN DE TEXTO con preservación de offsets def extract_text_from_pdf(path): reader = PdfReader(path) texts = [] for page in reader.pages: txt = page.extract_text() if txt: texts.append(txt) return "\n".join(texts) # Extraer todo el texto y conservar en full_text docs_texts = [] for fn in uploaded.keys(): if fn.lower().endswith(".pdf"): print(f"📘 Extrayendo texto de {fn}") t = extract_text_from_pdf(fn) docs_texts.append(t) full_text = "\n\n".join(docs_texts).strip() # # NORMALIZACIONES BÁSICAS # = # saltos múltiples full_text = re.sub(r"\n{3,}", "\n\n", full_text) # CHUNKING CON MAPEO DE POSICIONES (start,end) # - Generamos chunks por párrafos largos con solapamiento # - Guardamos el offset inicial de cada chunk en el texto completo def make_chunks_with_offsets(text, chunk_size_chars=1200, overlap_chars=200): chunks = [] starts = [] text_len = len(text) start = 0 while start < text_len: end = min(start + chunk_size_chars, text_len) chunk = text[start:end] chunks.append(chunk) starts.append(start) if end == text_len: break start = max(0, end - overlap_chars) return chunks, starts # Ajusta tamaños chunks, chunk_starts = make_chunks_with_offsets(full_text, chunk_size_chars=1200, overlap_chars=200) print("\nChunks creados:", len(chunks)) # EMBEDDINGS (Sentence-Transformers) embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') def get_embeddings(text_list): return embedding_model.encode(text_list, show_progress_bar=True, convert_to_numpy=True) # # CHROMADB (persistente) # PERSIST_DIR = "chroma_persist" client = chromadb.PersistentClient(path=PERSIST_DIR) collection_name = "reglamentos_unab" try: collection = client.get_collection(collection_name) except Exception: collection = client.create_collection(collection_name) # Antes de poblar: si la colección está vacía, añadir. Si ya existe, evitamos duplicar. existing_count = 0 try: existing_count = len(collection.get()['documents']) except Exception: existing_count = 0 if existing_count == 0: print("Poblando ChromaDB con embeddings (esto puede tardar)...") ids = [str(uuid.uuid4()) for _ in chunks] embeddings = get_embeddings(chunks).tolist() metadatas = [{"start": int(s)} for s in chunk_starts] # guardamos el offset inicial collection.add(ids=ids, documents=chunks, metadatas=metadatas, embeddings=embeddings) try: client.persist() except Exception: pass print("Colección poblada correctamente.") else: print(f"ChromaDB ya contiene {existing_count} documentos — se omite reindexado.") # MODELO QA (extractivo) - en español try: qa_pipeline = pipeline( "question-answering", model="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", tokenizer="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", device=0 ) except Exception: qa_pipeline = pipeline( "question-answering", model="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", tokenizer="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es" ) # # UTIL: detectar límites de artículo (ARTICULO/ART.) article_marker_regex = re.compile(r"\b(?:ART(?:ÍCULO|ICULO|\.?)\s*\d+|CAP[IÍ]TULO\b)", re.IGNORECASE) def expand_to_article_bounds(text, min_pos, max_pos): """ Dado un min_pos y max_pos (offsets char en full_text), buscamos el inicio del artículo anterior (último marcador antes de min_pos) y el inicio del siguiente marcador después de max_pos, y devolvemos el segmento. """ # buscar inicio anterior: último match cuyo .start() <= min_pos starts = [m.start() for m in article_marker_regex.finditer(text)] if not starts: # No hay marcadores: devolver desde min_pos a max_pos return text[min_pos:max_pos].strip() prev_starts = [s for s in starts if s <= min_pos] start_idx = prev_starts[-1] if prev_starts else 0 # buscar siguiente marcador después de max_pos next_starts = [s for s in starts if s > max_pos] end_idx = next_starts[0] if next_starts else len(text) return text[start_idx:end_idx].strip() # FUNCIONES DE BÚSQUEDA def buscar_articulo_exacto(frase): """ Busca la frase exacta en full_text; si aparece devuelve el artículo completo que la contiene (según markers). """ if not frase or not frase.strip(): return None idx = full_text.lower().find(frase.strip().lower()) if idx == -1: return None # expandir al artículo # encontrar fin correlativo # usamos expand_to_article_bounds con min_pos=idx, max_pos=idx+len(frase) seg = expand_to_article_bounds(full_text, idx, idx + len(frase)) return seg def rag_retrieve_and_expand(pregunta, top_k=5): """ Recupera top_k chunks semánticamente y luego expande al artículo que contiene los chunks recuperados. """ q_emb = embedding_model.encode([pregunta], convert_to_numpy=True)[0].tolist() results = collection.query( query_embeddings=[q_emb], n_results=top_k, include=['documents', 'metadatas', 'distances'] ) docs = results.get('documents', [[]])[0] metas = results.get('metadatas', [[]])[0] if not docs: return None # calcular min start y max end aproximado starts = [] for m in metas: try: starts.append(int(m.get('start', 0))) except Exception: starts.append(0) min_pos = min(starts) if starts else 0 max_pos_candidates = [] for doc, m in zip(docs, metas): s = int(m.get('start', 0)) if m else 0 max_pos_candidates.append(s + len(doc)) max_pos = max(max_pos_candidates) if max_pos_candidates else (min_pos + sum(len(d) for d in docs)) # expandir a límites de artículo segmento = expand_to_article_bounds(full_text, min_pos, max_pos) # evitar devolver textos vacíos return segmento if segmento.strip() else None # FUNCIÓN PRINCIPAL: decidir modo y devolver resultado def obtener_respuesta_avanzada(pregunta, modo="Artículo exacto", top_k=5): pregunta = (pregunta or "").strip() if not pregunta: return "Escriba una pregunta o frase para buscar." # Modo 1: Artículo exacto if modo == "Artículo exacto": exact = buscar_articulo_exacto(pregunta) if exact: return exact else: return "No se encontró coincidencia exacta en el texto." # Modo 2: RAG (artículo expandido) if modo == "RAG (artículos expandidos)": seg = rag_retrieve_and_expand(pregunta, top_k=top_k) if seg: return seg else: return "No se encontraron fragmentos relevantes." # Modo 3: RAG + QA (respuesta extractiva) if modo == "RAG + QA": seg = rag_retrieve_and_expand(pregunta, top_k=top_k) if not seg: return "No se encontraron fragmentos relevantes para generar respuesta." # llamar al pipeline QA con contexto = seg (control de longitud si hace falta) context = seg if len(context) > 3000: context = context[:3000] try: out = qa_pipeline(question=pregunta, context=context) answer = out.get('answer', '').strip() score = out.get('score', 0.0) return f"{answer}\n\n(Confianza: {score:.3f})\n\nFuente (fragmento):\n{seg[:3000]}" except Exception as e: return f"Error en QA pipeline: {str(e)}" return "Modo no reconocido." # INTERFAZ GRADIO with gr.Blocks(title="Asistente Reglamentos UNAB (RAG avanzado)") as demo: gr.Markdown("Asistente Reglamentos UNAB") gr.Markdown("Selecciona modo: 'Artículo exacto' devuelve el artículo completo si la frase aparece textualmente; 'RAG (artículos expandidos)' busca semánticamente y expande al artículo; 'RAG + QA' devuelve respuesta extractiva basada en los artículos recuperados.") with gr.Row(): pregunta = gr.Textbox(lines=2, placeholder="Escriba la pregunta o frase exacta...", label="Pregunta / Frase") modo = gr.Radio(["Artículo exacto", "RAG (artículos expandidos)", "RAG + QA"], value="Artículo exacto", label="Modo de búsqueda") topk = gr.Slider(1, 10, value=5, step=1, label="Top-k (para RAG)") with gr.Row(): btn = gr.Button("Buscar") with gr.Row(): salida = gr.Textbox(lines=25, label="Resultado (artículo / respuesta)") btn.click(fn=lambda q, m, k: obtener_respuesta_avanzada(q, modo=m, top_k=k), inputs=[pregunta, modo, topk], outputs=[salida]) gr.Markdown("---") gr.Markdown("### 📄 Ver texto completo extraído") with gr.Row(): show_full_btn = gr.Button("Mostrar texto completo") full_out = gr.Textbox(lines=30, label="Texto completo extraído") show_full_btn.click(fn=lambda: full_text or "No hay texto extraído.", inputs=None, outputs=full_out) demo.launch(share=True)