| | |
| | """Untitled19.ipynb |
| | |
| | Automatically generated by Colab. |
| | |
| | Original file is located at |
| | https://colab.research.google.com/drive/1BHGLAUd1RoMirSlz-R_eNGfVcNdcl8p8 |
| | """ |
| |
|
| | |
| |
|
| | !pip install -q transformers sentence-transformers torch chromadb pypdf2 gradio |
| | !pip install -q pypdf |
| |
|
| | |
| |
|
| | from pypdf import PdfReader |
| | from sentence_transformers import SentenceTransformer |
| | import chromadb |
| | from transformers import pipeline |
| | from google.colab import files |
| | import gradio as gr |
| | import uuid |
| | import re |
| | import os |
| |
|
| | |
| |
|
| | print("Selecciona los archivos PDF del reglamento (Pregrado y Posgrado)...") |
| | uploaded = files.upload() |
| |
|
| | print("\nArchivos cargados correctamente:") |
| | for fn in uploaded.keys(): |
| | print(f" • {fn}") |
| |
|
| | |
| | for fn, data in uploaded.items(): |
| | with open(fn, "wb") as f: |
| | f.write(data) |
| |
|
| | print("\nPDF guardados correctamente.") |
| |
|
| |
|
| | |
| |
|
| | def extract_text_from_pdf(path): |
| | reader = PdfReader(path) |
| | texts = [] |
| | for page in reader.pages: |
| | txt = page.extract_text() |
| | if txt: |
| | texts.append(txt) |
| | return "\n".join(texts) |
| |
|
| | |
| | docs_texts = [] |
| | for fn in uploaded.keys(): |
| | if fn.lower().endswith(".pdf"): |
| | print(f"📘 Extrayendo texto de {fn}") |
| | t = extract_text_from_pdf(fn) |
| | docs_texts.append(t) |
| |
|
| | full_text = "\n\n".join(docs_texts).strip() |
| |
|
| | |
| | |
| | |
| | |
| | full_text = re.sub(r"\n{3,}", "\n\n", full_text) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def make_chunks_with_offsets(text, chunk_size_chars=1200, overlap_chars=200): |
| | chunks = [] |
| | starts = [] |
| | text_len = len(text) |
| | start = 0 |
| | while start < text_len: |
| | end = min(start + chunk_size_chars, text_len) |
| | chunk = text[start:end] |
| | chunks.append(chunk) |
| | starts.append(start) |
| | if end == text_len: |
| | break |
| | start = max(0, end - overlap_chars) |
| | return chunks, starts |
| |
|
| | |
| | chunks, chunk_starts = make_chunks_with_offsets(full_text, chunk_size_chars=1200, overlap_chars=200) |
| | print("\nChunks creados:", len(chunks)) |
| |
|
| |
|
| | |
| |
|
| | embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') |
| |
|
| | def get_embeddings(text_list): |
| | return embedding_model.encode(text_list, show_progress_bar=True, convert_to_numpy=True) |
| |
|
| | |
| | |
| | |
| | PERSIST_DIR = "chroma_persist" |
| | client = chromadb.PersistentClient(path=PERSIST_DIR) |
| | collection_name = "reglamentos_unab" |
| |
|
| | try: |
| | collection = client.get_collection(collection_name) |
| | except Exception: |
| | collection = client.create_collection(collection_name) |
| |
|
| | |
| |
|
| | existing_count = 0 |
| | try: |
| | existing_count = len(collection.get()['documents']) |
| | except Exception: |
| | existing_count = 0 |
| |
|
| | if existing_count == 0: |
| | print("Poblando ChromaDB con embeddings (esto puede tardar)...") |
| | ids = [str(uuid.uuid4()) for _ in chunks] |
| | embeddings = get_embeddings(chunks).tolist() |
| | metadatas = [{"start": int(s)} for s in chunk_starts] |
| | collection.add(ids=ids, documents=chunks, metadatas=metadatas, embeddings=embeddings) |
| | try: |
| | client.persist() |
| | except Exception: |
| | pass |
| | print("Colección poblada correctamente.") |
| | else: |
| | print(f"ChromaDB ya contiene {existing_count} documentos — se omite reindexado.") |
| |
|
| |
|
| | |
| |
|
| | try: |
| | qa_pipeline = pipeline( |
| | "question-answering", |
| | model="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", |
| | tokenizer="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", |
| | device=0 |
| | ) |
| | except Exception: |
| | qa_pipeline = pipeline( |
| | "question-answering", |
| | model="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es", |
| | tokenizer="mrm8488/distill-bert-base-spanish-wwm-cased-finetuned-spa-squad2-es" |
| | ) |
| |
|
| | |
| | |
| |
|
| | article_marker_regex = re.compile(r"\b(?:ART(?:ÍCULO|ICULO|\.?)\s*\d+|CAP[IÍ]TULO\b)", re.IGNORECASE) |
| |
|
| | def expand_to_article_bounds(text, min_pos, max_pos): |
| | """ |
| | Dado un min_pos y max_pos (offsets char en full_text), |
| | buscamos el inicio del artículo anterior (último marcador antes de min_pos) |
| | y el inicio del siguiente marcador después de max_pos, y devolvemos el segmento. |
| | """ |
| | |
| | starts = [m.start() for m in article_marker_regex.finditer(text)] |
| | if not starts: |
| | |
| | return text[min_pos:max_pos].strip() |
| | prev_starts = [s for s in starts if s <= min_pos] |
| | start_idx = prev_starts[-1] if prev_starts else 0 |
| | |
| | next_starts = [s for s in starts if s > max_pos] |
| | end_idx = next_starts[0] if next_starts else len(text) |
| | return text[start_idx:end_idx].strip() |
| |
|
| | |
| |
|
| | def buscar_articulo_exacto(frase): |
| | """ |
| | Busca la frase exacta en full_text; si aparece devuelve |
| | el artículo completo que la contiene (según markers). |
| | """ |
| | if not frase or not frase.strip(): |
| | return None |
| | idx = full_text.lower().find(frase.strip().lower()) |
| | if idx == -1: |
| | return None |
| | |
| | |
| | |
| | seg = expand_to_article_bounds(full_text, idx, idx + len(frase)) |
| | return seg |
| |
|
| | def rag_retrieve_and_expand(pregunta, top_k=5): |
| | """ |
| | Recupera top_k chunks semánticamente y luego expande |
| | al artículo que contiene los chunks recuperados. |
| | """ |
| | q_emb = embedding_model.encode([pregunta], convert_to_numpy=True)[0].tolist() |
| | results = collection.query( |
| | query_embeddings=[q_emb], |
| | n_results=top_k, |
| | include=['documents', 'metadatas', 'distances'] |
| | ) |
| | docs = results.get('documents', [[]])[0] |
| | metas = results.get('metadatas', [[]])[0] |
| | if not docs: |
| | return None |
| | |
| | starts = [] |
| | for m in metas: |
| | try: |
| | starts.append(int(m.get('start', 0))) |
| | except Exception: |
| | starts.append(0) |
| | min_pos = min(starts) if starts else 0 |
| | max_pos_candidates = [] |
| | for doc, m in zip(docs, metas): |
| | s = int(m.get('start', 0)) if m else 0 |
| | max_pos_candidates.append(s + len(doc)) |
| | max_pos = max(max_pos_candidates) if max_pos_candidates else (min_pos + sum(len(d) for d in docs)) |
| | |
| | segmento = expand_to_article_bounds(full_text, min_pos, max_pos) |
| | |
| | return segmento if segmento.strip() else None |
| |
|
| |
|
| | |
| |
|
| | def obtener_respuesta_avanzada(pregunta, modo="Artículo exacto", top_k=5): |
| | pregunta = (pregunta or "").strip() |
| | if not pregunta: |
| | return "Escriba una pregunta o frase para buscar." |
| |
|
| | |
| | if modo == "Artículo exacto": |
| | exact = buscar_articulo_exacto(pregunta) |
| | if exact: |
| | return exact |
| | else: |
| | return "No se encontró coincidencia exacta en el texto." |
| |
|
| | |
| | if modo == "RAG (artículos expandidos)": |
| | seg = rag_retrieve_and_expand(pregunta, top_k=top_k) |
| | if seg: |
| | return seg |
| | else: |
| | return "No se encontraron fragmentos relevantes." |
| |
|
| | |
| | if modo == "RAG + QA": |
| | seg = rag_retrieve_and_expand(pregunta, top_k=top_k) |
| | if not seg: |
| | return "No se encontraron fragmentos relevantes para generar respuesta." |
| | |
| | context = seg |
| | if len(context) > 3000: |
| | context = context[:3000] |
| | try: |
| | out = qa_pipeline(question=pregunta, context=context) |
| | answer = out.get('answer', '').strip() |
| | score = out.get('score', 0.0) |
| | return f"{answer}\n\n(Confianza: {score:.3f})\n\nFuente (fragmento):\n{seg[:3000]}" |
| | except Exception as e: |
| | return f"Error en QA pipeline: {str(e)}" |
| |
|
| | return "Modo no reconocido." |
| |
|
| | |
| | with gr.Blocks(title="Asistente Reglamentos UNAB (RAG avanzado)") as demo: |
| | gr.Markdown("Asistente Reglamentos UNAB") |
| | gr.Markdown("Selecciona modo: 'Artículo exacto' devuelve el artículo completo si la frase aparece textualmente; 'RAG (artículos expandidos)' busca semánticamente y expande al artículo; 'RAG + QA' devuelve respuesta extractiva basada en los artículos recuperados.") |
| | with gr.Row(): |
| | pregunta = gr.Textbox(lines=2, placeholder="Escriba la pregunta o frase exacta...", label="Pregunta / Frase") |
| | modo = gr.Radio(["Artículo exacto", "RAG (artículos expandidos)", "RAG + QA"], value="Artículo exacto", label="Modo de búsqueda") |
| | topk = gr.Slider(1, 10, value=5, step=1, label="Top-k (para RAG)") |
| | with gr.Row(): |
| | btn = gr.Button("Buscar") |
| | with gr.Row(): |
| | salida = gr.Textbox(lines=25, label="Resultado (artículo / respuesta)") |
| |
|
| | btn.click(fn=lambda q, m, k: obtener_respuesta_avanzada(q, modo=m, top_k=k), inputs=[pregunta, modo, topk], outputs=[salida]) |
| |
|
| | gr.Markdown("---") |
| | gr.Markdown("### 📄 Ver texto completo extraído") |
| | with gr.Row(): |
| | show_full_btn = gr.Button("Mostrar texto completo") |
| | full_out = gr.Textbox(lines=30, label="Texto completo extraído") |
| | show_full_btn.click(fn=lambda: full_text or "No hay texto extraído.", inputs=None, outputs=full_out) |
| |
|
| | demo.launch(share=True) |